You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/11/06 15:29:27 UTC
svn commit: r1032071 - in /camel/trunk/components/camel-quickfix/src:
main/java/org/apache/camel/component/quickfixj/
test/java/org/apache/camel/component/quickfixj/
Author: davsclaus
Date: Sat Nov 6 14:29:27 2010
New Revision: 1032071
URL: http://svn.apache.org/viewvc?rev=1032071&view=rev
Log:
CAMEL-3318: Applied patch with thanks to Steve Bate.
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java
Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java?rev=1032071&r1=1032070&r2=1032071&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java Sat Nov 6 14:29:27 2010
@@ -25,6 +25,10 @@ import org.apache.camel.impl.DefaultComp
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import quickfix.LogFactory;
+import quickfix.MessageFactory;
+import quickfix.MessageStoreFactory;
+
public class QuickfixjComponent extends DefaultComponent {
private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class);
@@ -32,6 +36,10 @@ public class QuickfixjComponent extends
private final Map<String, QuickfixjEngine> engines = new HashMap<String, QuickfixjEngine>();
private final Map<String, QuickfixjEndpoint> endpoints = new HashMap<String, QuickfixjEndpoint>();
+ private MessageStoreFactory messageStoreFactory;
+ private LogFactory logFactory;
+ private MessageFactory messageFactory;
+
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
// Look up the engine instance based on the settings file ("remaining")
@@ -43,7 +51,7 @@ public class QuickfixjComponent extends
engine = engines.get(remaining);
if (engine == null) {
LOG.info("Creating QuickFIX/J engine using settings: " + remaining);
- engine = new QuickfixjEngine(remaining, false);
+ engine = new QuickfixjEngine(remaining, false, messageStoreFactory, logFactory, messageFactory);
engines.put(remaining, engine);
if (isStarted()) {
startQuickfixjEngine(engine);
@@ -90,4 +98,16 @@ public class QuickfixjComponent extends
Map<String, QuickfixjEngine> getEngines() {
return Collections.unmodifiableMap(engines);
}
+
+ public void setMessageFactory(MessageFactory messageFactory) {
+ this.messageFactory = messageFactory;
+ }
+
+ public void setLogFactory(LogFactory logFactory) {
+ this.logFactory = logFactory;
+ }
+
+ public void setMessageStoreFactory(MessageStoreFactory messageStoreFactory) {
+ this.messageStoreFactory = messageStoreFactory;
+ }
}
Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java?rev=1032071&r1=1032070&r2=1032071&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java Sat Nov 6 14:29:27 2010
@@ -52,6 +52,7 @@ import quickfix.Message;
import quickfix.MessageFactory;
import quickfix.MessageStoreFactory;
import quickfix.RejectLogon;
+import quickfix.SLF4JLogFactory;
import quickfix.ScreenLogFactory;
import quickfix.Session;
import quickfix.SessionFactory;
@@ -100,7 +101,16 @@ public class QuickfixjEngine {
ThreadPerConnector, ThreadPerSession;
}
- public QuickfixjEngine(String settingsResourceName, boolean forcedShutdown) throws ConfigError, FieldConvertError, IOException, JMException {
+ public QuickfixjEngine(String settingsResourceName, boolean forcedShutdown)
+ throws ConfigError, FieldConvertError, IOException, JMException {
+
+ this(settingsResourceName, forcedShutdown, null, null, null);
+ }
+
+ public QuickfixjEngine(String settingsResourceName, boolean forcedShutdown,
+ MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride, MessageFactory messageFactoryOverride)
+ throws ConfigError, FieldConvertError, IOException, JMException {
+
this.forcedShutdown = forcedShutdown;
this.settingsResourceName = settingsResourceName;
@@ -111,10 +121,17 @@ public class QuickfixjEngine {
SessionSettings settings = new SessionSettings(inputStream);
- // TODO Make the message factory configurable for advanced users
- messageFactory = new DefaultMessageFactory();
- sessionLogFactory = inferLogFactory(settings);
- messageStoreFactory = inferMessageStoreFactory(settings);
+ messageFactory = messageFactoryOverride != null
+ ? messageFactoryOverride
+ : new DefaultMessageFactory();
+
+ sessionLogFactory = sessionLogFactoryOverride != null
+ ? sessionLogFactoryOverride
+ : inferLogFactory(settings);
+
+ messageStoreFactory = messageStoreFactoryOverride != null
+ ? messageStoreFactoryOverride
+ : inferMessageStoreFactory(settings);
// Set default session schedule if not specified in configuration
if (!settings.isSetting(Session.SETTING_START_TIME)) {
@@ -276,6 +293,7 @@ public class QuickfixjEngine {
isFileLog(settings, impliedLogFactories);
isScreenLog(settings, impliedLogFactories);
isJdbcLog(settings, impliedLogFactories);
+ isSL4JLog(settings, impliedLogFactories);
if (impliedLogFactories.size() > 1) {
throw new ConfigError("Ambiguous log factory implied in configuration");
}
@@ -310,6 +328,17 @@ public class QuickfixjEngine {
}
}
+ private void isSL4JLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) {
+ if (impliedLogFactories.size() == 0) {
+ for (Object key : settings.getDefaultProperties().keySet()) {
+ if (key.toString().startsWith("SLF4J")) {
+ impliedLogFactories.add(new SLF4JLogFactory(settings));
+ return;
+ }
+ }
+ }
+ }
+
private boolean isConnectorRole(SessionSettings settings, String connectorRole) throws ConfigError {
boolean hasRole = false;
Iterator<SessionID> sessionIdItr = settings.sectionIterator();
Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java?rev=1032071&r1=1032070&r2=1032071&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java (original)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java Sat Nov 6 14:29:27 2010
@@ -20,6 +20,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Method;
+import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.concurrent.CountDownLatch;
@@ -42,8 +43,14 @@ import org.junit.Before;
import org.junit.Test;
import quickfix.Acceptor;
+import quickfix.DefaultMessageFactory;
import quickfix.FixVersions;
import quickfix.Initiator;
+import quickfix.LogFactory;
+import quickfix.MemoryStoreFactory;
+import quickfix.MessageFactory;
+import quickfix.MessageStoreFactory;
+import quickfix.ScreenLogFactory;
import quickfix.Session;
import quickfix.SessionFactory;
import quickfix.SessionID;
@@ -68,7 +75,10 @@ public class QuickfixjComponentTest {
private SessionID sessionID;
private SessionSettings settings;
private QuickfixjComponent component;
-
+ private MessageFactory engineMessageFactory;
+ private MessageStoreFactory engineMessageStoreFactory;
+ private LogFactory engineLogFactory;
+
private void setSessionID(SessionSettings sessionSettings, SessionID sessionID) {
sessionSettings.setString(sessionID, SessionSettings.BEGINSTRING, sessionID.getBeginString());
sessionSettings.setString(sessionID, SessionSettings.SENDERCOMPID, sessionID.getSenderCompID());
@@ -88,10 +98,6 @@ public class QuickfixjComponentTest {
settingsFile = File.createTempFile("quickfixj_test_", ".cfg");
tempdir = settingsFile.getParentFile();
URL[] urls = new URL[] {tempdir.toURI().toURL()};
-
- contextClassLoader = Thread.currentThread().getContextClassLoader();
- ClassLoader testClassLoader = new URLClassLoader(urls, contextClassLoader);
- Thread.currentThread().setContextClassLoader(testClassLoader);
sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "FOO", "BAR");
@@ -101,9 +107,30 @@ public class QuickfixjComponentTest {
settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false);
setSessionID(settings, sessionID);
+ contextClassLoader = Thread.currentThread().getContextClassLoader();
+ ClassLoader testClassLoader = new URLClassLoader(urls, contextClassLoader);
+ Thread.currentThread().setContextClassLoader(testClassLoader);
+ }
+
+ private void setUpComponent() throws IOException, MalformedURLException, NoSuchMethodException {
+ setUpComponent(false);
+ }
+
+ private void setUpComponent(boolean injectQfjPlugins) throws IOException, MalformedURLException, NoSuchMethodException {
DefaultCamelContext camelContext = new DefaultCamelContext();
component = new QuickfixjComponent();
component.setCamelContext(camelContext);
+
+ if (injectQfjPlugins) {
+ engineMessageFactory = new DefaultMessageFactory();
+ engineMessageStoreFactory = new MemoryStoreFactory();
+ engineLogFactory = new ScreenLogFactory();
+
+ component.setMessageFactory(engineMessageFactory);
+ component.setMessageStoreFactory(engineMessageStoreFactory);
+ component.setLogFactory(engineLogFactory);
+ }
+
assertThat(component.getEngines().size(), is(0));
Method converterMethod = QuickfixjConverters.class.getMethod("toSessionID", new Class<?>[] {String.class});
@@ -113,11 +140,15 @@ public class QuickfixjComponentTest {
@After
public void tearDown() throws Exception {
Thread.currentThread().setContextClassLoader(contextClassLoader);
- component.stop();
+ if (component != null) {
+ component.stop();
+ }
}
@Test
public void createEndpointBeforeComponentStart() throws Exception {
+ setUpComponent();
+
settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
@@ -147,6 +178,8 @@ public class QuickfixjComponentTest {
@Test
public void createEndpointAfterComponentStart() throws Exception {
+ setUpComponent();
+
settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
@@ -171,6 +204,8 @@ public class QuickfixjComponentTest {
@Test
public void componentStop() throws Exception {
+ setUpComponent();
+
settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
@@ -204,6 +239,8 @@ public class QuickfixjComponentTest {
@Test
public void messagePublication() throws Exception {
+ setUpComponent();
+
// Create settings file with both acceptor and initiator
SessionSettings settings = new SessionSettings();
@@ -268,6 +305,27 @@ public class QuickfixjComponentTest {
assertTrue("Messages not received", messageLatch.await(5000, TimeUnit.MILLISECONDS));
}
+ @Test
+ public void userSpecifiedQuickfixjPlugins() throws Exception {
+ setUpComponent(true);
+
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
+
+ writeSettings();
+
+ component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
+
+ component.start();
+
+ assertThat(component.getEngines().size(), is(1));
+ QuickfixjEngine engine = component.getEngines().values().iterator().next();
+
+ assertThat(engine.getMessageFactory(), is(engineMessageFactory));
+ assertThat(engine.getMessageStoreFactory(), is(engineMessageStoreFactory));
+ assertThat(engine.getLogFactory(), is(engineLogFactory));
+ }
+
private void writeSettings() throws IOException {
writeSettings(settings);
}
Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java?rev=1032071&r1=1032070&r2=1032071&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java (original)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java Sat Nov 6 14:29:27 2010
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -34,6 +35,8 @@ import org.apache.mina.common.TransportT
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+
import quickfix.Acceptor;
import quickfix.ConfigError;
import quickfix.DefaultMessageFactory;
@@ -46,8 +49,12 @@ import quickfix.Initiator;
import quickfix.JdbcLogFactory;
import quickfix.JdbcSetting;
import quickfix.JdbcStoreFactory;
+import quickfix.LogFactory;
import quickfix.MemoryStoreFactory;
import quickfix.Message;
+import quickfix.MessageFactory;
+import quickfix.MessageStoreFactory;
+import quickfix.SLF4JLogFactory;
import quickfix.ScreenLogFactory;
import quickfix.Session;
import quickfix.SessionFactory;
@@ -71,7 +78,6 @@ import static org.junit.Assert.assertTha
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
public class QuickfixjEngineTest {
private File settingsFile;
private ClassLoader contextClassLoader;
@@ -287,6 +293,23 @@ public class QuickfixjEngineTest {
}
@Test
+ public void inferSlf4jLog() throws Exception {
+ settings.setString(SLF4JLogFactory.SETTING_EVENT_CATEGORY, "Events");
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(MemoryStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(SLF4JLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ }
+
+ @Test
public void ambiguousLog() throws Exception {
settings.setString(FileLogFactory.SETTING_FILE_LOG_PATH, tempdir.toString());
settings.setBool(ScreenLogFactory.SETTING_LOG_EVENTS, true);
@@ -307,6 +330,24 @@ public class QuickfixjEngineTest {
}
@Test
+ public void useExplicitComponentImplementations() throws Exception {
+ settings.setString(SLF4JLogFactory.SETTING_EVENT_CATEGORY, "Events");
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ MessageStoreFactory messageStoreFactory = Mockito.mock(MessageStoreFactory.class);
+ LogFactory logFactory = Mockito.mock(LogFactory.class);
+ MessageFactory messageFactory = Mockito.mock(MessageFactory.class);
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false, messageStoreFactory, logFactory, messageFactory);
+
+ assertThat(quickfixjEngine.getMessageStoreFactory(), is(messageStoreFactory));
+ assertThat(quickfixjEngine.getLogFactory(), is(logFactory));
+ assertThat(quickfixjEngine.getMessageFactory(), is(messageFactory));
+ }
+
+ @Test
public void enableJmxForInitiator() throws Exception {
settings.setBool(QuickfixjEngine.SETTING_USE_JMX, true);
settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);