You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/11 12:13:03 UTC

[1/4] git commit: CAMEL-7184 Allow lazy creating QuickfixJ engines

Updated Branches:
  refs/heads/master 1099b8ce9 -> 16f112aa0


CAMEL-7184 Allow lazy creating QuickfixJ engines

Introduced "lazyCreateEngines" component setting and "lazyCreateEngine"
endpoint URI parameter.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e6f1bc4e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e6f1bc4e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e6f1bc4e

Branch: refs/heads/master
Commit: e6f1bc4e923810aa789425fb2bb86b1e3e8f1f81
Parents: 9e383f4
Author: Grzegorz Grzybek <gg...@redhat.com>
Authored: Tue Feb 11 11:04:00 2014 +0100
Committer: Grzegorz Grzybek <gg...@redhat.com>
Committed: Tue Feb 11 11:04:00 2014 +0100

----------------------------------------------------------------------
 .../component/quickfixj/QuickfixjComponent.java |  36 ++++++-
 .../component/quickfixj/QuickfixjEndpoint.java  |  14 +++
 .../component/quickfixj/QuickfixjEngine.java    |  70 +++++++++++--
 .../component/quickfixj/QuickfixjProducer.java  |   1 +
 .../quickfixj/QuickfixjComponentTest.java       | 100 ++++++++++++++++++-
 .../quickfixj/QuickfixjLazyProducerTest.java    |  85 ++++++++++++++++
 .../quickfixj/QuickfixjSpringTest.java          |   7 ++
 .../camel/component/quickfixj/TestSupport.java  |   8 +-
 .../quickfixj/QuickfixjSpringTest-context.xml   |  20 ++++
 9 files changed, 321 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
index 03b89cd..890d1d7 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
@@ -26,6 +26,7 @@ import org.apache.camel.StartupListener;
 import org.apache.camel.impl.DefaultComponent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import quickfix.LogFactory;
 import quickfix.MessageFactory;
 import quickfix.MessageStoreFactory;
@@ -33,6 +34,7 @@ import quickfix.SessionSettings;
 
 public class QuickfixjComponent extends DefaultComponent implements StartupListener {
     private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class);
+    private static final String PARAMETER_LAZY_CREATE_ENGINE = "lazyCreateEngine";
 
     private final Object engineInstancesLock = new Object();
     private final Map<String, QuickfixjEngine> engines = new HashMap<String, QuickfixjEngine>();
@@ -43,6 +45,7 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
     private LogFactory logFactory;
     private MessageFactory messageFactory;
     private Map<String, QuickfixjConfiguration> configurations = new HashMap<String, QuickfixjConfiguration>();
+    private boolean lazyCreateEngines = false;
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
@@ -58,12 +61,17 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
                 }
                 if (engine == null) {
                     QuickfixjConfiguration configuration = configurations.get(remaining);
+                    SessionSettings settings = null;
                     if (configuration != null) {
-                        SessionSettings settings = configuration.createSessionSettings();
-                        engine = new QuickfixjEngine(uri, settings, messageStoreFactory, logFactory, messageFactory);
+                        settings = configuration.createSessionSettings();
                     } else {
-                        engine = new QuickfixjEngine(uri, remaining, messageStoreFactory, logFactory, messageFactory);
+                        settings = QuickfixjEngine.loadSettings(remaining);
                     }
+                    Boolean lazyCreateEngineForEndpoint = super.getAndRemoveParameter(parameters, PARAMETER_LAZY_CREATE_ENGINE, Boolean.TYPE);
+                    if (lazyCreateEngineForEndpoint == null)
+                        lazyCreateEngineForEndpoint = isLazyCreateEngines();
+                    engine = new QuickfixjEngine(uri, settings, messageStoreFactory, logFactory, messageFactory,
+                            lazyCreateEngineForEndpoint);
 
                     // only start engine if CamelContext is already started, otherwise the engines gets started
                     // automatic later when CamelContext has been started using the StartupListener
@@ -112,8 +120,12 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
     }
 
     private void startQuickfixjEngine(QuickfixjEngine engine) throws Exception {
-        LOG.info("Starting QuickFIX/J engine: {}", engine.getUri());
-        engine.start();
+        if (!engine.isLazy()) {
+            LOG.info("Starting QuickFIX/J engine: {}", engine.getUri());
+            engine.start();
+        } else {
+            LOG.info("QuickFIX/J engine: {} will start lazily", engine.getUri());
+        }
     }
 
     // Test Support
@@ -153,6 +165,20 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
         this.configurations = configurations;
     }
 
+    public boolean isLazyCreateEngines() {
+        return this.lazyCreateEngines;
+    }
+
+    /**
+     * If set to <code>true</code>, the engines will be created and started when needed (when first message
+     * is send)
+     *
+     * @param lazyCreateEngines
+     */
+    public void setLazyCreateEngines(boolean lazyCreateEngines) {
+        this.lazyCreateEngines = lazyCreateEngines;
+    }
+
     @Override
     public void onCamelContextStarted(CamelContext camelContext, boolean alreadyStarted) throws Exception {
         // only start quickfix engines when CamelContext have finished starting

http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
index 44a117d..09f0822 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
@@ -136,6 +136,20 @@ public class QuickfixjEndpoint extends DefaultEndpoint implements QuickfixjEvent
         return true;
     }
 
+    /**
+     * Initializing and starts the engine if it wasn't initialized so far.
+     */
+    public void ensureInitialized() throws Exception {
+        if (!engine.isInitialized()) {
+            synchronized (engine) {
+                if (!engine.isInitialized()) {
+                    engine.initializeEngine();
+                    engine.start();
+                }
+            }
+        }
+    }
+
     public QuickfixjEngine getEngine() {
         return engine;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
index 673fd51..c1fb9ff 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.management.JMException;
 import javax.management.ObjectName;
@@ -87,17 +88,20 @@ public class QuickfixjEngine extends ServiceSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEngine.class);
 
-    private final Acceptor acceptor;
-    private final Initiator initiator;
-    private final JmxExporter jmxExporter;
-    private final MessageStoreFactory messageStoreFactory;
-    private final LogFactory sessionLogFactory;
-    private final MessageFactory messageFactory;
+    private Acceptor acceptor;
+    private Initiator initiator;
+    private JmxExporter jmxExporter;
+    private MessageStoreFactory messageStoreFactory;
+    private LogFactory sessionLogFactory;
+    private MessageFactory messageFactory;
     private final MessageCorrelator messageCorrelator = new MessageCorrelator();
     private List<QuickfixjEventListener> eventListeners = new CopyOnWriteArrayList<QuickfixjEventListener>();
     private final String uri;
     private ObjectName acceptorObjectName;
     private ObjectName initiatorObjectName;
+    private final SessionSettings settings;
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
+    private boolean lazy = false;
 
     public enum ThreadModel {
         ThreadPerConnector, ThreadPerSession;
@@ -148,13 +152,48 @@ public class QuickfixjEngine extends ServiceSupport {
 
     public QuickfixjEngine(String uri, SessionSettings settings, MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride,
                            MessageFactory messageFactoryOverride) throws ConfigError, FieldConvertError, IOException, JMException {
+        this(uri, settings, messageStoreFactoryOverride, sessionLogFactoryOverride, messageFactoryOverride, false);
+    }
+
+    public QuickfixjEngine(String uri, SessionSettings settings, MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride,
+            MessageFactory messageFactoryOverride, boolean lazy) throws ConfigError, FieldConvertError, IOException, JMException {
         addEventListener(messageCorrelator);
 
         this.uri = uri;
-        
-        messageFactory = messageFactoryOverride != null ? messageFactoryOverride : new DefaultMessageFactory();
-        sessionLogFactory = sessionLogFactoryOverride != null ? sessionLogFactoryOverride : inferLogFactory(settings);
-        messageStoreFactory = messageStoreFactoryOverride != null ? messageStoreFactoryOverride : inferMessageStoreFactory(settings);
+        this.lazy = lazy;
+        this.settings = settings;
+
+        // overrides
+        if (messageFactoryOverride != null) {
+            messageFactory = messageFactoryOverride;
+        }
+        if (sessionLogFactoryOverride != null) {
+            sessionLogFactory = sessionLogFactoryOverride;
+        }
+        if (messageStoreFactoryOverride != null) {
+            messageStoreFactory = messageStoreFactoryOverride;
+        }
+
+        if (!lazy) {
+            initializeEngine();
+        }
+    }
+
+    /**
+     * Initializes the engine on demand. May be called immediately in constructor or when needed.
+     * If initializing later, it should be started afterwards.
+     */
+    void initializeEngine() throws ConfigError,
+            FieldConvertError, JMException {
+        if (messageFactory == null) {
+            messageFactory = new DefaultMessageFactory();
+        }
+        if (sessionLogFactory == null) {
+            sessionLogFactory = inferLogFactory(settings);
+        }
+        if (messageStoreFactory == null) {
+            messageStoreFactory = inferMessageStoreFactory(settings);
+        }
 
         // Set default session schedule if not specified in configuration
         if (!settings.isSetting(Session.SETTING_START_TIME)) {
@@ -208,9 +247,10 @@ public class QuickfixjEngine extends ServiceSupport {
         } finally {
             Thread.currentThread().setContextClassLoader(ccl);
         }
+        initialized.set(true);
     }
 
-    private static SessionSettings loadSettings(String settingsResourceName) throws ConfigError {
+    static SessionSettings loadSettings(String settingsResourceName) throws ConfigError {
         InputStream inputStream = ObjectHelper.loadResourceAsStream(settingsResourceName);
         if (inputStream == null) {
             throw new IllegalArgumentException("Could not load " + settingsResourceName);
@@ -507,6 +547,14 @@ public class QuickfixjEngine extends ServiceSupport {
         return messageCorrelator;
     }
 
+    public boolean isInitialized() {
+        return this.initialized.get();
+    }
+
+    public boolean isLazy() {
+        return this.lazy;
+    }
+
     // For Testing
     Initiator getInitiator() {
         return initiator;

http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
index 4114cc0..ff262c0 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
@@ -43,6 +43,7 @@ public class QuickfixjProducer extends DefaultProducer {
     @Override
     public void process(Exchange exchange) throws Exception {
         try {
+            getEndpoint().ensureInitialized();
             sendMessage(exchange, exchange.getIn());
         } catch (Exception e) {
             exchange.setException(e);

http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
index 532a9b6..a8a369b 100644
--- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
+++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
@@ -168,6 +168,7 @@ public class QuickfixjComponentTest {
         Endpoint e1 = component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
         assertThat(component.getProvisionalEngines().size(), is(1));
         assertThat(component.getProvisionalEngines().get(settingsFile.getName()), is(notNullValue()));
+        assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isInitialized(), is(true));
         assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isStarted(), is(false));
         assertThat(component.getEngines().size(), is(0));
         assertThat(((QuickfixjEndpoint)e1).getSessionID(), is(nullValue()));
@@ -178,6 +179,7 @@ public class QuickfixjComponentTest {
         Endpoint e2 = component.createEndpoint(getEndpointUri(settingsFile2.getName(), null));
         assertThat(component.getProvisionalEngines().size(), is(2));
         assertThat(component.getProvisionalEngines().get(settingsFile.getName()), is(notNullValue()));
+        assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isInitialized(), is(true));
         assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isStarted(), is(false));
         assertThat(component.getEngines().size(), is(0));
         assertThat(((QuickfixjEndpoint)e2).getSessionID(), is(nullValue()));
@@ -187,15 +189,16 @@ public class QuickfixjComponentTest {
 
         assertThat(component.getProvisionalEngines().size(), is(0));
         assertThat(component.getEngines().size(), is(2));
+        assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
         assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
-        
+
         // Move these too an endpoint testcase if one exists
         assertThat(e1.isSingleton(), is(true));
         assertThat(((MultipleConsumersSupport)e1).isMultipleConsumersSupported(), is(true));
         assertThat(e2.isSingleton(), is(true));
         assertThat(((MultipleConsumersSupport)e2).isMultipleConsumersSupported(), is(true));
     }
-    
+
     @Test
     public void createEndpointAfterComponentStart() throws Exception {
         setUpComponent();
@@ -211,6 +214,7 @@ public class QuickfixjComponentTest {
         Endpoint e1 = component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
         assertThat(component.getEngines().size(), is(1));
         assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
+        assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
         assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
         assertThat(component.getProvisionalEngines().size(), is(0));
         assertThat(((QuickfixjEndpoint)e1).getSessionID(), is(nullValue()));
@@ -218,12 +222,102 @@ public class QuickfixjComponentTest {
         Endpoint e2 = component.createEndpoint(getEndpointUri(settingsFile.getName(), sessionID));
         assertThat(component.getEngines().size(), is(1));
         assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
+        assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
         assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
         assertThat(component.getProvisionalEngines().size(), is(0));
         assertThat(((QuickfixjEndpoint)e2).getSessionID(), is(sessionID));
     }
 
     @Test
+    public void createEnginesLazily() throws Exception {
+        setUpComponent();
+        component.setLazyCreateEngines(true);
+
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
+
+        writeSettings();
+
+        // start the component
+        camelContext.start();
+
+        QuickfixjEndpoint e1 = (QuickfixjEndpoint) component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
+        assertThat(component.getEngines().size(), is(1));
+        assertThat(component.getProvisionalEngines().size(), is(0));
+        assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
+        assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(false));
+        assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
+
+        e1.ensureInitialized();
+        assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
+        assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
+    }
+
+    @Test
+    public void createEndpointsInNonLazyComponent() throws Exception {
+        setUpComponent();
+        // configuration will be done per endpoint
+        component.setLazyCreateEngines(false);
+
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
+
+        writeSettings();
+
+        // will start the component
+        camelContext.start();
+
+        QuickfixjEndpoint e1 = (QuickfixjEndpoint) component.createEndpoint(getEndpointUri(settingsFile.getName(), null) + "?lazyCreateEngine=true");
+        assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(false));
+        assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
+        assertThat(component.getEngines().get(settingsFile.getName()).isLazy(), is(true));
+
+        e1.ensureInitialized();
+        assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
+        assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
+
+        writeSettings(settings, false);
+
+        // will use connector's lazyCreateEngines setting 
+        component.createEndpoint(getEndpointUri(settingsFile2.getName(), sessionID));
+        assertThat(component.getEngines().get(settingsFile2.getName()).isInitialized(), is(true));
+        assertThat(component.getEngines().get(settingsFile2.getName()).isStarted(), is(true));
+        assertThat(component.getEngines().get(settingsFile2.getName()).isLazy(), is(false));
+    }
+
+    @Test
+    public void createEndpointsInLazyComponent() throws Exception {
+        setUpComponent();
+        component.setLazyCreateEngines(true);
+
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
+
+        writeSettings();
+
+        // will start the component
+        camelContext.start();
+
+        // will use connector's lazyCreateEngines setting
+        QuickfixjEndpoint e1 = (QuickfixjEndpoint) component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
+        assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(false));
+        assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
+        assertThat(component.getEngines().get(settingsFile.getName()).isLazy(), is(true));
+
+        e1.ensureInitialized();
+        assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
+        assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
+
+        writeSettings(settings, false);
+
+        // will override connector's lazyCreateEngines setting
+        component.createEndpoint(getEndpointUri(settingsFile2.getName(), sessionID) + "&lazyCreateEngine=false");
+        assertThat(component.getEngines().get(settingsFile2.getName()).isInitialized(), is(true));
+        assertThat(component.getEngines().get(settingsFile2.getName()).isStarted(), is(true));
+        assertThat(component.getEngines().get(settingsFile2.getName()).isLazy(), is(false));
+    }
+
+    @Test
     public void componentStop() throws Exception {
         setUpComponent();
 
@@ -259,6 +353,8 @@ public class QuickfixjComponentTest {
         component.stop();
         
         assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
+        // it should still be initialized (ready to start again)
+        assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
new file mode 100644
index 0000000..dc1a2e3
--- /dev/null
+++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import quickfix.FixVersions;
+import quickfix.Message;
+import quickfix.MessageUtils;
+import quickfix.SessionID;
+import quickfix.field.BeginString;
+import quickfix.field.SenderCompID;
+import quickfix.field.TargetCompID;
+
+public class QuickfixjLazyProducerTest {
+    private Exchange mockExchange;
+    private QuickfixjEndpoint endpoint;
+    private org.apache.camel.Message mockCamelMessage;
+    private QuickfixjProducer producer;
+    private SessionID sessionID;
+    private Message inboundFixMessage;
+    private QuickfixjEngine quickfixjEngine;
+
+    @Before
+    public void setUp() throws Exception {
+        mockExchange = Mockito.mock(Exchange.class);
+        mockCamelMessage = Mockito.mock(org.apache.camel.Message.class);
+        Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
+        Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOnly);
+        
+        quickfixjEngine = TestSupport.createEngine(true);
+        endpoint = Mockito.spy(new QuickfixjEndpoint(quickfixjEngine, "", new QuickfixjComponent()));
+
+        inboundFixMessage = new Message();
+        inboundFixMessage.getHeader().setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX44);
+        inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER");
+        inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET");
+        sessionID = MessageUtils.getSessionID(inboundFixMessage);
+   
+        Mockito.when(mockCamelMessage.getBody(Message.class)).thenReturn(inboundFixMessage);
+
+        Mockito.when(endpoint.getSessionID()).thenReturn(sessionID);     
+
+        producer = Mockito.spy(new QuickfixjProducer(endpoint));
+    }
+
+    @Test
+    public void processWithLazyEngine() throws Exception {
+        QuickfixjEngine engine = (QuickfixjEngine) ReflectionTestUtils.getField(endpoint, "engine");
+        assertThat(engine.isInitialized(), is(false));
+        assertThat(engine.isStarted(), is(false));
+//        Session mockSession = Mockito.spy(TestSupport.createSession(sessionID));
+//        Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+//        Mockito.doReturn(true).when(mockSession).send(Matchers.isA(Message.class));
+
+        producer.process(mockExchange);
+        assertThat(engine.isInitialized(), is(true));
+        assertThat(engine.isStarted(), is(true));
+//
+//        Mockito.verify(mockExchange, Mockito.never()).setException(Matchers.isA(IllegalStateException.class));
+//        Mockito.verify(mockSession).send(inboundFixMessage);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java
index f071bc9..1915a5f 100644
--- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java
+++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java
@@ -67,7 +67,14 @@ public class QuickfixjSpringTest extends CamelSpringTestSupport {
         assertThat(sessionProperties.get("SocketConnectProtocol").toString(), CoreMatchers.is("VM_PIPE"));
 
         QuickfixjComponent component = context.getComponent("quickfix", QuickfixjComponent.class);
+        assertThat(component.isLazyCreateEngines(), is(false));
         QuickfixjEngine engine = component.getEngines().values().iterator().next();
+        assertThat(engine.isInitialized(), is(true));
+
+        QuickfixjComponent lazyComponent = context.getComponent("lazyQuickfix", QuickfixjComponent.class);
+        assertThat(lazyComponent.isLazyCreateEngines(), is(true));
+        QuickfixjEngine lazyEngine = lazyComponent.getEngines().values().iterator().next();
+        assertThat(lazyEngine.isInitialized(), is(false));
 
         assertThat(engine.getMessageFactory(), is(instanceOf(CustomMessageFactory.class)));
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
index c0d415a..5a14e63 100644
--- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
+++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
@@ -93,8 +93,12 @@ public final class TestSupport {
         
         return factory.create(sessionID, settings);
     }
-    
+
     public static QuickfixjEngine createEngine() throws ConfigError, FieldConvertError, IOException, JMException {       
+        return createEngine(false);
+    }
+
+    public static QuickfixjEngine createEngine(boolean lazy) throws ConfigError, FieldConvertError, IOException, JMException {       
         SessionID sessionID = new SessionID("FIX.4.4:SENDER->TARGET");
 
         MessageStoreFactory mockMessageStoreFactory = Mockito.mock(MessageStoreFactory.class);
@@ -114,6 +118,6 @@ public final class TestSupport {
         return new QuickfixjEngine("", settings, 
             mockMessageStoreFactory, 
             Mockito.mock(LogFactory.class), 
-            Mockito.mock(MessageFactory.class));
+            Mockito.mock(MessageFactory.class), lazy);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml b/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml
index 03725b7..10f9d58 100644
--- a/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml
+++ b/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml
@@ -34,6 +34,13 @@
                 <to uri="log:test"/>
             </filter>
         </route>
+        <route>
+            <from uri="lazyQuickfix:example"/>
+            <filter>
+                <simple>${in.header.EventCategory} == 'AppMessageReceived'</simple>
+                <to uri="log:test"/>
+            </filter>
+        </route>
     </camelContext>
 
     <!-- quickfix component -->
@@ -48,6 +55,19 @@
         </property>
     </bean>
 
+    <!-- lazy quickfix component -->
+    <bean id="lazyQuickfix" class="org.apache.camel.component.quickfixj.QuickfixjComponent">
+        <property name="lazyCreateEngines" value="true" />
+        <property name="configurations">
+            <util:map>
+                <entry key="example" value-ref="quickfixjConfiguration"/>
+            </util:map>
+        </property>
+        <property name="messageFactory">
+            <bean class="org.apache.camel.component.quickfixj.QuickfixjSpringTest.CustomMessageFactory"/>
+        </property>
+    </bean>
+
     <!-- quickfix settings -->
     <bean id="quickfixjConfiguration" class="org.apache.camel.component.quickfixj.QuickfixjConfiguration">
         <property name="defaultSettings">


[3/4] git commit: Checkstyle fixes

Posted by da...@apache.org.
Checkstyle fixes


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/afb9e70c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/afb9e70c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/afb9e70c

Branch: refs/heads/master
Commit: afb9e70c4ed4df8958312206460fbab2c0612ce2
Parents: f294ba2
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Tue Feb 11 11:38:02 2014 +0100
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Tue Feb 11 11:38:02 2014 +0100

----------------------------------------------------------------------
 .../apache/camel/component/quickfixj/QuickfixjComponent.java   | 5 +++--
 .../org/apache/camel/component/quickfixj/QuickfixjEngine.java  | 2 +-
 .../camel/component/quickfixj/QuickfixjLazyProducerTest.java   | 6 +++---
 3 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/afb9e70c/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
index 890d1d7..cd8c014 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
@@ -45,7 +45,7 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
     private LogFactory logFactory;
     private MessageFactory messageFactory;
     private Map<String, QuickfixjConfiguration> configurations = new HashMap<String, QuickfixjConfiguration>();
-    private boolean lazyCreateEngines = false;
+    private boolean lazyCreateEngines;
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
@@ -68,8 +68,9 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe
                         settings = QuickfixjEngine.loadSettings(remaining);
                     }
                     Boolean lazyCreateEngineForEndpoint = super.getAndRemoveParameter(parameters, PARAMETER_LAZY_CREATE_ENGINE, Boolean.TYPE);
-                    if (lazyCreateEngineForEndpoint == null)
+                    if (lazyCreateEngineForEndpoint == null) {
                         lazyCreateEngineForEndpoint = isLazyCreateEngines();
+                    }
                     engine = new QuickfixjEngine(uri, settings, messageStoreFactory, logFactory, messageFactory,
                             lazyCreateEngineForEndpoint);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/afb9e70c/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
index c1fb9ff..1016d42 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
@@ -101,7 +101,7 @@ public class QuickfixjEngine extends ServiceSupport {
     private ObjectName initiatorObjectName;
     private final SessionSettings settings;
     private final AtomicBoolean initialized = new AtomicBoolean(false);
-    private boolean lazy = false;
+    private boolean lazy;
 
     public enum ThreadModel {
         ThreadPerConnector, ThreadPerSession;

http://git-wip-us.apache.org/repos/asf/camel/blob/afb9e70c/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
index dc1a2e3..fcc8b59 100644
--- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
+++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.camel.component.quickfixj;
 
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.*;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.junit.Before;
@@ -34,6 +31,9 @@ import quickfix.field.BeginString;
 import quickfix.field.SenderCompID;
 import quickfix.field.TargetCompID;
 
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+
 public class QuickfixjLazyProducerTest {
     private Exchange mockExchange;
     private QuickfixjEndpoint endpoint;


[4/4] git commit: Merge branch 'CAMEL-7184' of https://github.com/grgrzybek/camel

Posted by da...@apache.org.
Merge branch 'CAMEL-7184' of https://github.com/grgrzybek/camel


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16f112aa
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16f112aa
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16f112aa

Branch: refs/heads/master
Commit: 16f112aa0a4953b1e3f11addca9cb6c6d90dffc4
Parents: 1099b8c afb9e70
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Feb 11 11:45:21 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Feb 11 11:45:21 2014 +0100

----------------------------------------------------------------------
 .../component/quickfixj/QuickfixjComponent.java |  37 ++++++-
 .../component/quickfixj/QuickfixjEndpoint.java  |  14 +++
 .../component/quickfixj/QuickfixjEngine.java    |  70 +++++++++++--
 .../component/quickfixj/QuickfixjProducer.java  |   2 +
 .../quickfixj/QuickfixjComponentTest.java       | 100 ++++++++++++++++++-
 .../quickfixj/QuickfixjLazyProducerTest.java    |  85 ++++++++++++++++
 .../quickfixj/QuickfixjProducerTest.java        |   1 +
 .../quickfixj/QuickfixjSpringTest.java          |   7 ++
 .../camel/component/quickfixj/TestSupport.java  |   8 +-
 .../quickfixj/QuickfixjSpringTest-context.xml   |  20 ++++
 10 files changed, 324 insertions(+), 20 deletions(-)
----------------------------------------------------------------------



[2/4] git commit: Preserve headers in QuickfixjProducer.process()

Posted by da...@apache.org.
Preserve headers in QuickfixjProducer.process()

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f294ba21
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f294ba21
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f294ba21

Branch: refs/heads/master
Commit: f294ba21e9c039edbea2e593cf44ac73d81d1bbe
Parents: e6f1bc4
Author: Grzegorz Grzybek <gg...@redhat.com>
Authored: Tue Feb 11 11:15:03 2014 +0100
Committer: Grzegorz Grzybek <gg...@redhat.com>
Committed: Tue Feb 11 11:15:03 2014 +0100

----------------------------------------------------------------------
 .../org/apache/camel/component/quickfixj/QuickfixjProducer.java     | 1 +
 .../org/apache/camel/component/quickfixj/QuickfixjProducerTest.java | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f294ba21/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
index ff262c0..9e1dbf0 100644
--- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
+++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
@@ -77,6 +77,7 @@ public class QuickfixjProducer extends DefaultProducer {
 
         if (callable != null) {
             Message reply = callable.call();
+            exchange.getOut().getHeaders().putAll(camelMessage.getHeaders());
             exchange.getOut().setBody(reply);
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f294ba21/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
index 498f6ff..be6f341 100644
--- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
+++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
@@ -153,6 +153,7 @@ public class QuickfixjProducerTest {
         
         Mockito.verify(mockExchange, Mockito.never()).setException(Matchers.isA(IllegalStateException.class));
         Mockito.verify(mockSession).send(inboundFixMessage);
+        Mockito.verify(mockOutboundCamelMessage).getHeaders();
         Mockito.verify(mockOutboundCamelMessage).setBody(outboundFixMessage);
     }