You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/11 12:13:03 UTC
[1/4] git commit: CAMEL-7184 Allow lazy creating QuickfixJ engines
Updated Branches:
refs/heads/master 1099b8ce9 -> 16f112aa0
CAMEL-7184 Allow lazy creating QuickfixJ engines
Introduced "lazyCreateEngines" component setting and "lazyCreateEngine"
endpoint URI parameter.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e6f1bc4e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e6f1bc4e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e6f1bc4e
Branch: refs/heads/master
Commit: e6f1bc4e923810aa789425fb2bb86b1e3e8f1f81
Parents: 9e383f4
Author: Grzegorz Grzybek <gg...@redhat.com>
Authored: Tue Feb 11 11:04:00 2014 +0100
Committer: Grzegorz Grzybek <gg...@redhat.com>
Committed: Tue Feb 11 11:04:00 2014 +0100
----------------------------------------------------------------------
.../component/quickfixj/QuickfixjComponent.java | 36 ++++++-
.../component/quickfixj/QuickfixjEndpoint.java | 14 +++
.../component/quickfixj/QuickfixjEngine.java | 70 +++++++++++--
.../component/quickfixj/QuickfixjProducer.java | 1 +
.../quickfixj/QuickfixjComponentTest.java | 100 ++++++++++++++++++-
.../quickfixj/QuickfixjLazyProducerTest.java | 85 ++++++++++++++++
.../quickfixj/QuickfixjSpringTest.java | 7 ++
.../camel/component/quickfixj/TestSupport.java | 8 +-
.../quickfixj/QuickfixjSpringTest-context.xml | 20 ++++
9 files changed, 321 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
index 03b89cd..890d1d7 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
@@ -26,6 +26,7 @@ import org.apache.camel.StartupListener;
import org.apache.camel.impl.DefaultComponent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import quickfix.LogFactory;
import quickfix.MessageFactory;
import quickfix.MessageStoreFactory;
@@ -33,6 +34,7 @@ import quickfix.SessionSettings;
public class QuickfixjComponent extends DefaultComponent implements StartupListener {
private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class);
+ private static final String PARAMETER_LAZY_CREATE_ENGINE = "lazyCreateEngine";
private final Object engineInstancesLock = new Object();
private final Map<String, QuickfixjEngine> engines = new HashMap<String, QuickfixjEngine>();
@@ -43,6 +45,7 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
private LogFactory logFactory;
private MessageFactory messageFactory;
private Map<String, QuickfixjConfiguration> configurations = new HashMap<String, QuickfixjConfiguration>();
+ private boolean lazyCreateEngines = false;
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
@@ -58,12 +61,17 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
}
if (engine == null) {
QuickfixjConfiguration configuration = configurations.get(remaining);
+ SessionSettings settings = null;
if (configuration != null) {
- SessionSettings settings = configuration.createSessionSettings();
- engine = new QuickfixjEngine(uri, settings, messageStoreFactory, logFactory, messageFactory);
+ settings = configuration.createSessionSettings();
} else {
- engine = new QuickfixjEngine(uri, remaining, messageStoreFactory, logFactory, messageFactory);
+ settings = QuickfixjEngine.loadSettings(remaining);
}
+ Boolean lazyCreateEngineForEndpoint = super.getAndRemoveParameter(parameters, PARAMETER_LAZY_CREATE_ENGINE, Boolean.TYPE);
+ if (lazyCreateEngineForEndpoint == null)
+ lazyCreateEngineForEndpoint = isLazyCreateEngines();
+ engine = new QuickfixjEngine(uri, settings, messageStoreFactory, logFactory, messageFactory,
+ lazyCreateEngineForEndpoint);
// only start engine if CamelContext is already started, otherwise the engines gets started
// automatic later when CamelContext has been started using the StartupListener
@@ -112,8 +120,12 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
}
private void startQuickfixjEngine(QuickfixjEngine engine) throws Exception {
- LOG.info("Starting QuickFIX/J engine: {}", engine.getUri());
- engine.start();
+ if (!engine.isLazy()) {
+ LOG.info("Starting QuickFIX/J engine: {}", engine.getUri());
+ engine.start();
+ } else {
+ LOG.info("QuickFIX/J engine: {} will start lazily", engine.getUri());
+ }
}
// Test Support
@@ -153,6 +165,20 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
this.configurations = configurations;
}
+ public boolean isLazyCreateEngines() {
+ return this.lazyCreateEngines;
+ }
+
+ /**
+ * If set to <code>true</code>, the engines will be created and started when needed (when first message
+ * is send)
+ *
+ * @param lazyCreateEngines
+ */
+ public void setLazyCreateEngines(boolean lazyCreateEngines) {
+ this.lazyCreateEngines = lazyCreateEngines;
+ }
+
@Override
public void onCamelContextStarted(CamelContext camelContext, boolean alreadyStarted) throws Exception {
// only start quickfix engines when CamelContext have finished starting
http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
index 44a117d..09f0822 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
@@ -136,6 +136,20 @@ public class QuickfixjEndpoint extends DefaultEndpoint implements QuickfixjEvent
return true;
}
+ /**
+ * Initializing and starts the engine if it wasn't initialized so far.
+ */
+ public void ensureInitialized() throws Exception {
+ if (!engine.isInitialized()) {
+ synchronized (engine) {
+ if (!engine.isInitialized()) {
+ engine.initializeEngine();
+ engine.start();
+ }
+ }
+ }
+ }
+
public QuickfixjEngine getEngine() {
return engine;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
index 673fd51..c1fb9ff 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.JMException;
import javax.management.ObjectName;
@@ -87,17 +88,20 @@ public class QuickfixjEngine extends ServiceSupport {
private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEngine.class);
- private final Acceptor acceptor;
- private final Initiator initiator;
- private final JmxExporter jmxExporter;
- private final MessageStoreFactory messageStoreFactory;
- private final LogFactory sessionLogFactory;
- private final MessageFactory messageFactory;
+ private Acceptor acceptor;
+ private Initiator initiator;
+ private JmxExporter jmxExporter;
+ private MessageStoreFactory messageStoreFactory;
+ private LogFactory sessionLogFactory;
+ private MessageFactory messageFactory;
private final MessageCorrelator messageCorrelator = new MessageCorrelator();
private List<QuickfixjEventListener> eventListeners = new CopyOnWriteArrayList<QuickfixjEventListener>();
private final String uri;
private ObjectName acceptorObjectName;
private ObjectName initiatorObjectName;
+ private final SessionSettings settings;
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+ private boolean lazy = false;
public enum ThreadModel {
ThreadPerConnector, ThreadPerSession;
@@ -148,13 +152,48 @@ public class QuickfixjEngine extends ServiceSupport {
public QuickfixjEngine(String uri, SessionSettings settings, MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride,
MessageFactory messageFactoryOverride) throws ConfigError, FieldConvertError, IOException, JMException {
+ this(uri, settings, messageStoreFactoryOverride, sessionLogFactoryOverride, messageFactoryOverride, false);
+ }
+
+ public QuickfixjEngine(String uri, SessionSettings settings, MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride,
+ MessageFactory messageFactoryOverride, boolean lazy) throws ConfigError, FieldConvertError, IOException, JMException {
addEventListener(messageCorrelator);
this.uri = uri;
-
- messageFactory = messageFactoryOverride != null ? messageFactoryOverride : new DefaultMessageFactory();
- sessionLogFactory = sessionLogFactoryOverride != null ? sessionLogFactoryOverride : inferLogFactory(settings);
- messageStoreFactory = messageStoreFactoryOverride != null ? messageStoreFactoryOverride : inferMessageStoreFactory(settings);
+ this.lazy = lazy;
+ this.settings = settings;
+
+ // overrides
+ if (messageFactoryOverride != null) {
+ messageFactory = messageFactoryOverride;
+ }
+ if (sessionLogFactoryOverride != null) {
+ sessionLogFactory = sessionLogFactoryOverride;
+ }
+ if (messageStoreFactoryOverride != null) {
+ messageStoreFactory = messageStoreFactoryOverride;
+ }
+
+ if (!lazy) {
+ initializeEngine();
+ }
+ }
+
+ /**
+ * Initializes the engine on demand. May be called immediately in constructor or when needed.
+ * If initializing later, it should be started afterwards.
+ */
+ void initializeEngine() throws ConfigError,
+ FieldConvertError, JMException {
+ if (messageFactory == null) {
+ messageFactory = new DefaultMessageFactory();
+ }
+ if (sessionLogFactory == null) {
+ sessionLogFactory = inferLogFactory(settings);
+ }
+ if (messageStoreFactory == null) {
+ messageStoreFactory = inferMessageStoreFactory(settings);
+ }
// Set default session schedule if not specified in configuration
if (!settings.isSetting(Session.SETTING_START_TIME)) {
@@ -208,9 +247,10 @@ public class QuickfixjEngine extends ServiceSupport {
} finally {
Thread.currentThread().setContextClassLoader(ccl);
}
+ initialized.set(true);
}
- private static SessionSettings loadSettings(String settingsResourceName) throws ConfigError {
+ static SessionSettings loadSettings(String settingsResourceName) throws ConfigError {
InputStream inputStream = ObjectHelper.loadResourceAsStream(settingsResourceName);
if (inputStream == null) {
throw new IllegalArgumentException("Could not load " + settingsResourceName);
@@ -507,6 +547,14 @@ public class QuickfixjEngine extends ServiceSupport {
return messageCorrelator;
}
+ public boolean isInitialized() {
+ return this.initialized.get();
+ }
+
+ public boolean isLazy() {
+ return this.lazy;
+ }
+
// For Testing
Initiator getInitiator() {
return initiator;
http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
index 4114cc0..ff262c0 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
@@ -43,6 +43,7 @@ public class QuickfixjProducer extends DefaultProducer {
@Override
public void process(Exchange exchange) throws Exception {
try {
+ getEndpoint().ensureInitialized();
sendMessage(exchange, exchange.getIn());
} catch (Exception e) {
exchange.setException(e);
http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
index 532a9b6..a8a369b 100644
--- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
+++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
@@ -168,6 +168,7 @@ public class QuickfixjComponentTest {
Endpoint e1 = component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
assertThat(component.getProvisionalEngines().size(), is(1));
assertThat(component.getProvisionalEngines().get(settingsFile.getName()), is(notNullValue()));
+ assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isStarted(), is(false));
assertThat(component.getEngines().size(), is(0));
assertThat(((QuickfixjEndpoint)e1).getSessionID(), is(nullValue()));
@@ -178,6 +179,7 @@ public class QuickfixjComponentTest {
Endpoint e2 = component.createEndpoint(getEndpointUri(settingsFile2.getName(), null));
assertThat(component.getProvisionalEngines().size(), is(2));
assertThat(component.getProvisionalEngines().get(settingsFile.getName()), is(notNullValue()));
+ assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isStarted(), is(false));
assertThat(component.getEngines().size(), is(0));
assertThat(((QuickfixjEndpoint)e2).getSessionID(), is(nullValue()));
@@ -187,15 +189,16 @@ public class QuickfixjComponentTest {
assertThat(component.getProvisionalEngines().size(), is(0));
assertThat(component.getEngines().size(), is(2));
+ assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
-
+
// Move these too an endpoint testcase if one exists
assertThat(e1.isSingleton(), is(true));
assertThat(((MultipleConsumersSupport)e1).isMultipleConsumersSupported(), is(true));
assertThat(e2.isSingleton(), is(true));
assertThat(((MultipleConsumersSupport)e2).isMultipleConsumersSupported(), is(true));
}
-
+
@Test
public void createEndpointAfterComponentStart() throws Exception {
setUpComponent();
@@ -211,6 +214,7 @@ public class QuickfixjComponentTest {
Endpoint e1 = component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
assertThat(component.getEngines().size(), is(1));
assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
+ assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
assertThat(component.getProvisionalEngines().size(), is(0));
assertThat(((QuickfixjEndpoint)e1).getSessionID(), is(nullValue()));
@@ -218,12 +222,102 @@ public class QuickfixjComponentTest {
Endpoint e2 = component.createEndpoint(getEndpointUri(settingsFile.getName(), sessionID));
assertThat(component.getEngines().size(), is(1));
assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
+ assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
assertThat(component.getProvisionalEngines().size(), is(0));
assertThat(((QuickfixjEndpoint)e2).getSessionID(), is(sessionID));
}
@Test
+ public void createEnginesLazily() throws Exception {
+ setUpComponent();
+ component.setLazyCreateEngines(true);
+
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
+
+ writeSettings();
+
+ // start the component
+ camelContext.start();
+
+ QuickfixjEndpoint e1 = (QuickfixjEndpoint) component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
+ assertThat(component.getEngines().size(), is(1));
+ assertThat(component.getProvisionalEngines().size(), is(0));
+ assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
+ assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(false));
+ assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
+
+ e1.ensureInitialized();
+ assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
+ assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
+ }
+
+ @Test
+ public void createEndpointsInNonLazyComponent() throws Exception {
+ setUpComponent();
+ // configuration will be done per endpoint
+ component.setLazyCreateEngines(false);
+
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
+
+ writeSettings();
+
+ // will start the component
+ camelContext.start();
+
+ QuickfixjEndpoint e1 = (QuickfixjEndpoint) component.createEndpoint(getEndpointUri(settingsFile.getName(), null) + "?lazyCreateEngine=true");
+ assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(false));
+ assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
+ assertThat(component.getEngines().get(settingsFile.getName()).isLazy(), is(true));
+
+ e1.ensureInitialized();
+ assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
+ assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
+
+ writeSettings(settings, false);
+
+ // will use connector's lazyCreateEngines setting
+ component.createEndpoint(getEndpointUri(settingsFile2.getName(), sessionID));
+ assertThat(component.getEngines().get(settingsFile2.getName()).isInitialized(), is(true));
+ assertThat(component.getEngines().get(settingsFile2.getName()).isStarted(), is(true));
+ assertThat(component.getEngines().get(settingsFile2.getName()).isLazy(), is(false));
+ }
+
+ @Test
+ public void createEndpointsInLazyComponent() throws Exception {
+ setUpComponent();
+ component.setLazyCreateEngines(true);
+
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
+
+ writeSettings();
+
+ // will start the component
+ camelContext.start();
+
+ // will use connector's lazyCreateEngines setting
+ QuickfixjEndpoint e1 = (QuickfixjEndpoint) component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
+ assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(false));
+ assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
+ assertThat(component.getEngines().get(settingsFile.getName()).isLazy(), is(true));
+
+ e1.ensureInitialized();
+ assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
+ assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
+
+ writeSettings(settings, false);
+
+ // will override connector's lazyCreateEngines setting
+ component.createEndpoint(getEndpointUri(settingsFile2.getName(), sessionID) + "&lazyCreateEngine=false");
+ assertThat(component.getEngines().get(settingsFile2.getName()).isInitialized(), is(true));
+ assertThat(component.getEngines().get(settingsFile2.getName()).isStarted(), is(true));
+ assertThat(component.getEngines().get(settingsFile2.getName()).isLazy(), is(false));
+ }
+
+ @Test
public void componentStop() throws Exception {
setUpComponent();
@@ -259,6 +353,8 @@ public class QuickfixjComponentTest {
component.stop();
assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
+ // it should still be initialized (ready to start again)
+ assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
}
@Test
http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
new file mode 100644
index 0000000..dc1a2e3
--- /dev/null
+++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import quickfix.FixVersions;
+import quickfix.Message;
+import quickfix.MessageUtils;
+import quickfix.SessionID;
+import quickfix.field.BeginString;
+import quickfix.field.SenderCompID;
+import quickfix.field.TargetCompID;
+
+public class QuickfixjLazyProducerTest {
+ private Exchange mockExchange;
+ private QuickfixjEndpoint endpoint;
+ private org.apache.camel.Message mockCamelMessage;
+ private QuickfixjProducer producer;
+ private SessionID sessionID;
+ private Message inboundFixMessage;
+ private QuickfixjEngine quickfixjEngine;
+
+ @Before
+ public void setUp() throws Exception {
+ mockExchange = Mockito.mock(Exchange.class);
+ mockCamelMessage = Mockito.mock(org.apache.camel.Message.class);
+ Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
+ Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOnly);
+
+ quickfixjEngine = TestSupport.createEngine(true);
+ endpoint = Mockito.spy(new QuickfixjEndpoint(quickfixjEngine, "", new QuickfixjComponent()));
+
+ inboundFixMessage = new Message();
+ inboundFixMessage.getHeader().setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX44);
+ inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER");
+ inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET");
+ sessionID = MessageUtils.getSessionID(inboundFixMessage);
+
+ Mockito.when(mockCamelMessage.getBody(Message.class)).thenReturn(inboundFixMessage);
+
+ Mockito.when(endpoint.getSessionID()).thenReturn(sessionID);
+
+ producer = Mockito.spy(new QuickfixjProducer(endpoint));
+ }
+
+ @Test
+ public void processWithLazyEngine() throws Exception {
+ QuickfixjEngine engine = (QuickfixjEngine) ReflectionTestUtils.getField(endpoint, "engine");
+ assertThat(engine.isInitialized(), is(false));
+ assertThat(engine.isStarted(), is(false));
+// Session mockSession = Mockito.spy(TestSupport.createSession(sessionID));
+// Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+// Mockito.doReturn(true).when(mockSession).send(Matchers.isA(Message.class));
+
+ producer.process(mockExchange);
+ assertThat(engine.isInitialized(), is(true));
+ assertThat(engine.isStarted(), is(true));
+//
+// Mockito.verify(mockExchange, Mockito.never()).setException(Matchers.isA(IllegalStateException.class));
+// Mockito.verify(mockSession).send(inboundFixMessage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java
index f071bc9..1915a5f 100644
--- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java
+++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java
@@ -67,7 +67,14 @@ public class QuickfixjSpringTest extends CamelSpringTestSupport {
assertThat(sessionProperties.get("SocketConnectProtocol").toString(), CoreMatchers.is("VM_PIPE"));
QuickfixjComponent component = context.getComponent("quickfix", QuickfixjComponent.class);
+ assertThat(component.isLazyCreateEngines(), is(false));
QuickfixjEngine engine = component.getEngines().values().iterator().next();
+ assertThat(engine.isInitialized(), is(true));
+
+ QuickfixjComponent lazyComponent = context.getComponent("lazyQuickfix", QuickfixjComponent.class);
+ assertThat(lazyComponent.isLazyCreateEngines(), is(true));
+ QuickfixjEngine lazyEngine = lazyComponent.getEngines().values().iterator().next();
+ assertThat(lazyEngine.isInitialized(), is(false));
assertThat(engine.getMessageFactory(), is(instanceOf(CustomMessageFactory.class)));
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
index c0d415a..5a14e63 100644
--- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
+++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
@@ -93,8 +93,12 @@ public final class TestSupport {
return factory.create(sessionID, settings);
}
-
+
public static QuickfixjEngine createEngine() throws ConfigError, FieldConvertError, IOException, JMException {
+ return createEngine(false);
+ }
+
+ public static QuickfixjEngine createEngine(boolean lazy) throws ConfigError, FieldConvertError, IOException, JMException {
SessionID sessionID = new SessionID("FIX.4.4:SENDER->TARGET");
MessageStoreFactory mockMessageStoreFactory = Mockito.mock(MessageStoreFactory.class);
@@ -114,6 +118,6 @@ public final class TestSupport {
return new QuickfixjEngine("", settings,
mockMessageStoreFactory,
Mockito.mock(LogFactory.class),
- Mockito.mock(MessageFactory.class));
+ Mockito.mock(MessageFactory.class), lazy);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml b/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml
index 03725b7..10f9d58 100644
--- a/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml
+++ b/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml
@@ -34,6 +34,13 @@
<to uri="log:test"/>
</filter>
</route>
+ <route>
+ <from uri="lazyQuickfix:example"/>
+ <filter>
+ <simple>${in.header.EventCategory} == 'AppMessageReceived'</simple>
+ <to uri="log:test"/>
+ </filter>
+ </route>
</camelContext>
<!-- quickfix component -->
@@ -48,6 +55,19 @@
</property>
</bean>
+ <!-- lazy quickfix component -->
+ <bean id="lazyQuickfix" class="org.apache.camel.component.quickfixj.QuickfixjComponent">
+ <property name="lazyCreateEngines" value="true" />
+ <property name="configurations">
+ <util:map>
+ <entry key="example" value-ref="quickfixjConfiguration"/>
+ </util:map>
+ </property>
+ <property name="messageFactory">
+ <bean class="org.apache.camel.component.quickfixj.QuickfixjSpringTest.CustomMessageFactory"/>
+ </property>
+ </bean>
+
<!-- quickfix settings -->
<bean id="quickfixjConfiguration" class="org.apache.camel.component.quickfixj.QuickfixjConfiguration">
<property name="defaultSettings">
[3/4] git commit: Checkstyle fixes
Posted by da...@apache.org.
Checkstyle fixes
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/afb9e70c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/afb9e70c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/afb9e70c
Branch: refs/heads/master
Commit: afb9e70c4ed4df8958312206460fbab2c0612ce2
Parents: f294ba2
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Tue Feb 11 11:38:02 2014 +0100
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Tue Feb 11 11:38:02 2014 +0100
----------------------------------------------------------------------
.../apache/camel/component/quickfixj/QuickfixjComponent.java | 5 +++--
.../org/apache/camel/component/quickfixj/QuickfixjEngine.java | 2 +-
.../camel/component/quickfixj/QuickfixjLazyProducerTest.java | 6 +++---
3 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/afb9e70c/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
index 890d1d7..cd8c014 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
@@ -45,7 +45,7 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
private LogFactory logFactory;
private MessageFactory messageFactory;
private Map<String, QuickfixjConfiguration> configurations = new HashMap<String, QuickfixjConfiguration>();
- private boolean lazyCreateEngines = false;
+ private boolean lazyCreateEngines;
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
@@ -68,8 +68,9 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
settings = QuickfixjEngine.loadSettings(remaining);
}
Boolean lazyCreateEngineForEndpoint = super.getAndRemoveParameter(parameters, PARAMETER_LAZY_CREATE_ENGINE, Boolean.TYPE);
- if (lazyCreateEngineForEndpoint == null)
+ if (lazyCreateEngineForEndpoint == null) {
lazyCreateEngineForEndpoint = isLazyCreateEngines();
+ }
engine = new QuickfixjEngine(uri, settings, messageStoreFactory, logFactory, messageFactory,
lazyCreateEngineForEndpoint);
http://git-wip-us.apache.org/repos/asf/camel/blob/afb9e70c/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
index c1fb9ff..1016d42 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
@@ -101,7 +101,7 @@ public class QuickfixjEngine extends ServiceSupport {
private ObjectName initiatorObjectName;
private final SessionSettings settings;
private final AtomicBoolean initialized = new AtomicBoolean(false);
- private boolean lazy = false;
+ private boolean lazy;
public enum ThreadModel {
ThreadPerConnector, ThreadPerSession;
http://git-wip-us.apache.org/repos/asf/camel/blob/afb9e70c/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
index dc1a2e3..fcc8b59 100644
--- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
+++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
@@ -16,9 +16,6 @@
*/
package org.apache.camel.component.quickfixj;
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.*;
-
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.junit.Before;
@@ -34,6 +31,9 @@ import quickfix.field.BeginString;
import quickfix.field.SenderCompID;
import quickfix.field.TargetCompID;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+
public class QuickfixjLazyProducerTest {
private Exchange mockExchange;
private QuickfixjEndpoint endpoint;
[4/4] git commit: Merge branch 'CAMEL-7184' of
https://github.com/grgrzybek/camel
Posted by da...@apache.org.
Merge branch 'CAMEL-7184' of https://github.com/grgrzybek/camel
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16f112aa
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16f112aa
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16f112aa
Branch: refs/heads/master
Commit: 16f112aa0a4953b1e3f11addca9cb6c6d90dffc4
Parents: 1099b8c afb9e70
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Feb 11 11:45:21 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Feb 11 11:45:21 2014 +0100
----------------------------------------------------------------------
.../component/quickfixj/QuickfixjComponent.java | 37 ++++++-
.../component/quickfixj/QuickfixjEndpoint.java | 14 +++
.../component/quickfixj/QuickfixjEngine.java | 70 +++++++++++--
.../component/quickfixj/QuickfixjProducer.java | 2 +
.../quickfixj/QuickfixjComponentTest.java | 100 ++++++++++++++++++-
.../quickfixj/QuickfixjLazyProducerTest.java | 85 ++++++++++++++++
.../quickfixj/QuickfixjProducerTest.java | 1 +
.../quickfixj/QuickfixjSpringTest.java | 7 ++
.../camel/component/quickfixj/TestSupport.java | 8 +-
.../quickfixj/QuickfixjSpringTest-context.xml | 20 ++++
10 files changed, 324 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
[2/4] git commit: Preserve headers in QuickfixjProducer.process()
Posted by da...@apache.org.
Preserve headers in QuickfixjProducer.process()
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f294ba21
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f294ba21
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f294ba21
Branch: refs/heads/master
Commit: f294ba21e9c039edbea2e593cf44ac73d81d1bbe
Parents: e6f1bc4
Author: Grzegorz Grzybek <gg...@redhat.com>
Authored: Tue Feb 11 11:15:03 2014 +0100
Committer: Grzegorz Grzybek <gg...@redhat.com>
Committed: Tue Feb 11 11:15:03 2014 +0100
----------------------------------------------------------------------
.../org/apache/camel/component/quickfixj/QuickfixjProducer.java | 1 +
.../org/apache/camel/component/quickfixj/QuickfixjProducerTest.java | 1 +
2 files changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/f294ba21/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
index ff262c0..9e1dbf0 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
@@ -77,6 +77,7 @@ public class QuickfixjProducer extends DefaultProducer {
if (callable != null) {
Message reply = callable.call();
+ exchange.getOut().getHeaders().putAll(camelMessage.getHeaders());
exchange.getOut().setBody(reply);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/f294ba21/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
index 498f6ff..be6f341 100644
--- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
+++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
@@ -153,6 +153,7 @@ public class QuickfixjProducerTest {
Mockito.verify(mockExchange, Mockito.never()).setException(Matchers.isA(IllegalStateException.class));
Mockito.verify(mockSession).send(inboundFixMessage);
+ Mockito.verify(mockOutboundCamelMessage).getHeaders();
Mockito.verify(mockOutboundCamelMessage).setBody(outboundFixMessage);
}