You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dk...@apache.org on 2011/12/02 18:03:50 UTC

svn commit: r1209585 [13/22] - in /camel/trunk: ./ apache-camel/ buildingtools/ camel-core/ camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/api/management/ camel-core/src/main/java/org/apache/camel/builder/ camel-co...

Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java?rev=1209585&r1=1209584&r2=1209585&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java Fri Dec  2 17:03:07 2011
@@ -1,73 +1,73 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.quickfixj;
-
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultConsumer;
-
-import quickfix.MessageUtils;
-import quickfix.Session;
-import quickfix.SessionID;
-
-public class QuickfixjConsumer extends DefaultConsumer {
-
-    public QuickfixjConsumer(Endpoint endpoint, Processor processor) {
-        super(endpoint, processor);
-    }
-
-    public void onExchange(Exchange exchange) throws Exception {
-        if (isStarted()) {
-            try {
-                getProcessor().process(exchange);
-                if (exchange.getPattern().isOutCapable() && exchange.hasOut()) {
-                    sendOutMessage(exchange);
-                }
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
-        }
-    }
-
-    private void sendOutMessage(Exchange exchange) {
-        try {
-            Message camelMessage = exchange.getOut();
-            quickfix.Message quickfixjMessage = camelMessage.getBody(quickfix.Message.class);
-     
-            if (log.isDebugEnabled()) {
-                log.debug("Sending FIX message reply: " + quickfixjMessage.toString());
-            }
-            
-            SessionID messageSessionID = MessageUtils.getReverseSessionID(exchange.getIn().getBody(quickfix.Message.class));
-            
-            Session session = getSession(messageSessionID);
-            if (session == null) {
-                throw new IllegalStateException("Unknown session: " + messageSessionID);
-            }
-            
-            session.send(quickfixjMessage);
-        } catch (Exception e) {
-            exchange.setException(e);
-        }
-    }
-
-    Session getSession(SessionID messageSessionID) {
-        return Session.lookupSession(messageSessionID);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+
+import quickfix.MessageUtils;
+import quickfix.Session;
+import quickfix.SessionID;
+
+public class QuickfixjConsumer extends DefaultConsumer {
+
+    public QuickfixjConsumer(Endpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    public void onExchange(Exchange exchange) throws Exception {
+        if (isStarted()) {
+            try {
+                getProcessor().process(exchange);
+                if (exchange.getPattern().isOutCapable() && exchange.hasOut()) {
+                    sendOutMessage(exchange);
+                }
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+        }
+    }
+
+    private void sendOutMessage(Exchange exchange) {
+        try {
+            Message camelMessage = exchange.getOut();
+            quickfix.Message quickfixjMessage = camelMessage.getBody(quickfix.Message.class);
+     
+            if (log.isDebugEnabled()) {
+                log.debug("Sending FIX message reply: " + quickfixjMessage.toString());
+            }
+            
+            SessionID messageSessionID = MessageUtils.getReverseSessionID(exchange.getIn().getBody(quickfix.Message.class));
+            
+            Session session = getSession(messageSessionID);
+            if (session == null) {
+                throw new IllegalStateException("Unknown session: " + messageSessionID);
+            }
+            
+            session.send(quickfixjMessage);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+    }
+
+    Session getSession(SessionID messageSessionID) {
+        return Session.lookupSession(messageSessionID);
+    }
+}

Propchange: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java?rev=1209585&r1=1209584&r2=1209585&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java Fri Dec  2 17:03:07 2011
@@ -1,139 +1,139 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.quickfixj;
-
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.MultipleConsumersSupport;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
-import org.apache.camel.ResolveEndpointFailedException;
-import org.apache.camel.component.quickfixj.converter.QuickfixjConverters;
-import org.apache.camel.impl.DefaultEndpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import quickfix.Message;
-import quickfix.SessionID;
-
-public class QuickfixjEndpoint extends DefaultEndpoint implements QuickfixjEventListener, MultipleConsumersSupport {
-    public static final String EVENT_CATEGORY_KEY = "EventCategory";
-    public static final String SESSION_ID_KEY = "SessionID";
-    public static final String MESSAGE_TYPE_KEY = "MessageType";
-    public static final String DATA_DICTIONARY_KEY = "DataDictionary";
-
-    private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEndpoint.class);
-
-    private SessionID sessionID;
-    private final List<QuickfixjConsumer> consumers = new CopyOnWriteArrayList<QuickfixjConsumer>();
-    private final QuickfixjEngine engine;
-
-    public QuickfixjEndpoint(QuickfixjEngine engine, String uri, CamelContext context) {
-        super(uri, context);
-        this.engine = engine;
-    }
-
-    public SessionID getSessionID() {
-        return sessionID;
-    }
-
-    public void setSessionID(SessionID sessionID) {
-        this.sessionID = sessionID;
-    }
-
-    public Consumer createConsumer(Processor processor) throws Exception {
-        LOG.info("Creating QuickFIX/J consumer: "
-            + (sessionID != null ? sessionID : "No Session")
-            + ", ExchangePattern=" + getExchangePattern());
-        QuickfixjConsumer consumer = new QuickfixjConsumer(this, processor);
-        consumers.add(consumer);
-        return consumer;
-    }
-
-    public Producer createProducer() throws Exception {
-        LOG.info("Creating QuickFIX/J producer: "
-            + (sessionID != null ? sessionID : "No Session"));
-        if (isWildcarded()) {
-            throw new ResolveEndpointFailedException("Cannot create consumer on wildcarded session identifier: " + sessionID);
-        }
-        return new QuickfixjProducer(this);
-    }
-
-    public boolean isSingleton() {
-        return true;
-    }
-
-    public void onEvent(QuickfixjEventCategory eventCategory, SessionID sessionID, Message message) throws Exception {
-        if (this.sessionID == null || isMatching(sessionID)) {
-            for (QuickfixjConsumer consumer : consumers) {
-                Exchange exchange = QuickfixjConverters.toExchange(this, sessionID, message, eventCategory);
-                consumer.onExchange(exchange);
-                if (exchange.getException() != null) {
-                    throw exchange.getException();
-                }
-            }
-        }
-    }
-
-    private boolean isMatching(SessionID sessionID) {
-        if (this.sessionID.equals(sessionID)) {
-            return true;
-        }
-        return isMatching(this.sessionID.getBeginString(), sessionID.getBeginString())
-            && isMatching(this.sessionID.getSenderCompID(), sessionID.getSenderCompID())
-            && isMatching(this.sessionID.getSenderSubID(), sessionID.getSenderSubID())
-            && isMatching(this.sessionID.getSenderLocationID(), sessionID.getSenderLocationID())
-            && isMatching(this.sessionID.getTargetCompID(), sessionID.getTargetCompID())
-            && isMatching(this.sessionID.getTargetSubID(), sessionID.getTargetSubID()) 
-            && isMatching(this.sessionID.getTargetLocationID(), sessionID.getTargetLocationID());
-    }
-
-    private boolean isMatching(String s1, String s2) {
-        return s1.equals("") || s1.equals("*") || s1.equals(s2);
-    }
-
-    private boolean isWildcarded() {
-        if (sessionID == null) {
-            return false;
-        }
-        return sessionID.getBeginString().equals("*")
-            || sessionID.getSenderCompID().equals("*")
-            || sessionID.getSenderSubID().equals("*")
-            || sessionID.getSenderLocationID().equals("*")
-            || sessionID.getTargetCompID().equals("*")
-            || sessionID.getTargetSubID().equals("*")
-            || sessionID.getTargetLocationID().equals("*");
-    }
-
-    public boolean isMultipleConsumersSupported() {
-        return true;
-    }
-
-    public QuickfixjEngine getEngine() {
-        return engine;
-    }
-    
-    @Override
-    protected void doStop() throws Exception {
-        // clear list of consumers
-        consumers.clear();
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.MultipleConsumersSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.component.quickfixj.converter.QuickfixjConverters;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.Message;
+import quickfix.SessionID;
+
+public class QuickfixjEndpoint extends DefaultEndpoint implements QuickfixjEventListener, MultipleConsumersSupport {
+    public static final String EVENT_CATEGORY_KEY = "EventCategory";
+    public static final String SESSION_ID_KEY = "SessionID";
+    public static final String MESSAGE_TYPE_KEY = "MessageType";
+    public static final String DATA_DICTIONARY_KEY = "DataDictionary";
+
+    private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEndpoint.class);
+
+    private SessionID sessionID;
+    private final List<QuickfixjConsumer> consumers = new CopyOnWriteArrayList<QuickfixjConsumer>();
+    private final QuickfixjEngine engine;
+
+    public QuickfixjEndpoint(QuickfixjEngine engine, String uri, CamelContext context) {
+        super(uri, context);
+        this.engine = engine;
+    }
+
+    public SessionID getSessionID() {
+        return sessionID;
+    }
+
+    public void setSessionID(SessionID sessionID) {
+        this.sessionID = sessionID;
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        LOG.info("Creating QuickFIX/J consumer: "
+            + (sessionID != null ? sessionID : "No Session")
+            + ", ExchangePattern=" + getExchangePattern());
+        QuickfixjConsumer consumer = new QuickfixjConsumer(this, processor);
+        consumers.add(consumer);
+        return consumer;
+    }
+
+    public Producer createProducer() throws Exception {
+        LOG.info("Creating QuickFIX/J producer: "
+            + (sessionID != null ? sessionID : "No Session"));
+        if (isWildcarded()) {
+            throw new ResolveEndpointFailedException("Cannot create consumer on wildcarded session identifier: " + sessionID);
+        }
+        return new QuickfixjProducer(this);
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public void onEvent(QuickfixjEventCategory eventCategory, SessionID sessionID, Message message) throws Exception {
+        if (this.sessionID == null || isMatching(sessionID)) {
+            for (QuickfixjConsumer consumer : consumers) {
+                Exchange exchange = QuickfixjConverters.toExchange(this, sessionID, message, eventCategory);
+                consumer.onExchange(exchange);
+                if (exchange.getException() != null) {
+                    throw exchange.getException();
+                }
+            }
+        }
+    }
+
+    private boolean isMatching(SessionID sessionID) {
+        if (this.sessionID.equals(sessionID)) {
+            return true;
+        }
+        return isMatching(this.sessionID.getBeginString(), sessionID.getBeginString())
+            && isMatching(this.sessionID.getSenderCompID(), sessionID.getSenderCompID())
+            && isMatching(this.sessionID.getSenderSubID(), sessionID.getSenderSubID())
+            && isMatching(this.sessionID.getSenderLocationID(), sessionID.getSenderLocationID())
+            && isMatching(this.sessionID.getTargetCompID(), sessionID.getTargetCompID())
+            && isMatching(this.sessionID.getTargetSubID(), sessionID.getTargetSubID()) 
+            && isMatching(this.sessionID.getTargetLocationID(), sessionID.getTargetLocationID());
+    }
+
+    private boolean isMatching(String s1, String s2) {
+        return s1.equals("") || s1.equals("*") || s1.equals(s2);
+    }
+
+    private boolean isWildcarded() {
+        if (sessionID == null) {
+            return false;
+        }
+        return sessionID.getBeginString().equals("*")
+            || sessionID.getSenderCompID().equals("*")
+            || sessionID.getSenderSubID().equals("*")
+            || sessionID.getSenderLocationID().equals("*")
+            || sessionID.getTargetCompID().equals("*")
+            || sessionID.getTargetSubID().equals("*")
+            || sessionID.getTargetLocationID().equals("*");
+    }
+
+    public boolean isMultipleConsumersSupported() {
+        return true;
+    }
+
+    public QuickfixjEngine getEngine() {
+        return engine;
+    }
+    
+    @Override
+    protected void doStop() throws Exception {
+        // clear list of consumers
+        consumers.clear();
+    }
+}

Propchange: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java?rev=1209585&r1=1209584&r2=1209585&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java Fri Dec  2 17:03:07 2011
@@ -1,492 +1,492 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.quickfixj;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import javax.management.JMException;
-
-import org.apache.camel.util.ObjectHelper;
-import org.quickfixj.jmx.JmxExporter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import quickfix.Acceptor;
-import quickfix.Application;
-import quickfix.ConfigError;
-import quickfix.DefaultMessageFactory;
-import quickfix.DoNotSend;
-import quickfix.FieldConvertError;
-import quickfix.FieldNotFound;
-import quickfix.FileLogFactory;
-import quickfix.FileStoreFactory;
-import quickfix.IncorrectDataFormat;
-import quickfix.IncorrectTagValue;
-import quickfix.Initiator;
-import quickfix.JdbcLogFactory;
-import quickfix.JdbcSetting;
-import quickfix.JdbcStoreFactory;
-import quickfix.LogFactory;
-import quickfix.MemoryStoreFactory;
-import quickfix.Message;
-import quickfix.MessageFactory;
-import quickfix.MessageStoreFactory;
-import quickfix.RejectLogon;
-import quickfix.SLF4JLogFactory;
-import quickfix.ScreenLogFactory;
-import quickfix.Session;
-import quickfix.SessionFactory;
-import quickfix.SessionID;
-import quickfix.SessionSettings;
-import quickfix.SleepycatStoreFactory;
-import quickfix.SocketAcceptor;
-import quickfix.SocketInitiator;
-import quickfix.ThreadedSocketAcceptor;
-import quickfix.ThreadedSocketInitiator;
-import quickfix.UnsupportedMessageType;
-
-/**
- * This is a wrapper class that provided QuickFIX/J initialization capabilities
- * beyond those supported in the core QuickFIX/J distribution.
- * 
- * Specifically, it infers dependencies on specific implementations of message
- * stores and logs. It also supports extended QuickFIX/J settings properties to
- * specify threading models, custom store and log implementations, etc.
- * 
- * The wrapper will create an initiator or acceptor or both depending on the
- * roles of sessions described in the settings file.
- */
-public class QuickfixjEngine {
-    public static final String DEFAULT_START_TIME = "00:00:00";
-    public static final String DEFAULT_END_TIME = "00:00:00";
-    public static final long DEFAULT_HEARTBTINT = 30;
-    public static final String SETTING_THREAD_MODEL = "ThreadModel";
-    public static final String SETTING_USE_JMX = "UseJmx";
-
-    private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEngine.class);
-
-    private final Acceptor acceptor;
-    private final Initiator initiator;
-    private final JmxExporter jmxExporter;
-    private final boolean forcedShutdown;
-    private final MessageStoreFactory messageStoreFactory;
-    private final LogFactory sessionLogFactory;
-    private final MessageFactory messageFactory;
-    private final MessageCorrelator messageCorrelator = new MessageCorrelator();
-    
-    private boolean started;
-    private List<QuickfixjEventListener> eventListeners = new CopyOnWriteArrayList<QuickfixjEventListener>();
-
-    private final String uri;
-
-    public enum ThreadModel {
-        ThreadPerConnector, ThreadPerSession;
-    }
-
-    public QuickfixjEngine(String uri, String settingsResourceName, boolean forcedShutdown)
-        throws ConfigError, FieldConvertError, IOException, JMException {
-
-        this(uri, settingsResourceName, forcedShutdown, null, null, null);
-    }
-
-    public QuickfixjEngine(String uri, String settingsResourceName, boolean forcedShutdown,
-            MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride,
-            MessageFactory messageFactoryOverride) throws ConfigError, FieldConvertError, IOException, JMException {
-        this(uri, loadSettings(settingsResourceName), forcedShutdown, messageStoreFactoryOverride,
-                sessionLogFactoryOverride, messageFactoryOverride);
-    }
-
-    public QuickfixjEngine(String uri, SessionSettings settings, boolean forcedShutdown,
-            MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride,
-            MessageFactory messageFactoryOverride) throws ConfigError, FieldConvertError, IOException, JMException {
-
-        addEventListener(messageCorrelator);
-
-        this.uri = uri;
-        this.forcedShutdown = forcedShutdown;
-        
-        messageFactory = messageFactoryOverride != null ? messageFactoryOverride : new DefaultMessageFactory();
-        sessionLogFactory = sessionLogFactoryOverride != null ? sessionLogFactoryOverride : inferLogFactory(settings);
-        messageStoreFactory = messageStoreFactoryOverride != null ? messageStoreFactoryOverride : inferMessageStoreFactory(settings);
-
-        // Set default session schedule if not specified in configuration
-        if (!settings.isSetting(Session.SETTING_START_TIME)) {
-            settings.setString(Session.SETTING_START_TIME, DEFAULT_START_TIME);
-        }
-        if (!settings.isSetting(Session.SETTING_END_TIME)) {
-            settings.setString(Session.SETTING_END_TIME, DEFAULT_END_TIME);
-        }
-        // Default heartbeat interval
-        if (!settings.isSetting(Session.SETTING_HEARTBTINT)) {
-            settings.setLong(Session.SETTING_HEARTBTINT, DEFAULT_HEARTBTINT);
-        }
-
-        // Allow specification of the QFJ threading model
-        ThreadModel threadModel = ThreadModel.ThreadPerConnector;
-        if (settings.isSetting(SETTING_THREAD_MODEL)) {
-            threadModel = ThreadModel.valueOf(settings.getString(SETTING_THREAD_MODEL));
-        }
-
-        if (settings.isSetting(SETTING_USE_JMX) && settings.getBool(SETTING_USE_JMX)) {
-            LOG.info("Enabling JMX for QuickFIX/J");
-            jmxExporter = new JmxExporter();
-        } else {
-            jmxExporter = null;
-        }
-
-        // From original component implementation...
-        // To avoid this exception in OSGi platform
-        // java.lang.NoClassDefFoundError: quickfix/fix41/MessageFactory
-        ClassLoader ccl = Thread.currentThread().getContextClassLoader();
-        try {
-            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-
-            if (isConnectorRole(settings, SessionFactory.ACCEPTOR_CONNECTION_TYPE)) {
-                acceptor = createAcceptor(new Dispatcher(), settings, messageStoreFactory, 
-                    sessionLogFactory, messageFactory, threadModel);
-            } else {
-                acceptor = null;
-            }
-
-            if (isConnectorRole(settings, SessionFactory.INITIATOR_CONNECTION_TYPE)) {
-                initiator = createInitiator(new Dispatcher(), settings, messageStoreFactory, 
-                    sessionLogFactory, messageFactory, threadModel);
-            } else {
-                initiator = null;
-            }
-
-            if (acceptor == null && initiator == null) {
-                throw new ConfigError("No connector role");
-            }
-        } finally {
-            Thread.currentThread().setContextClassLoader(ccl);
-        }
-    }
-
-    private static SessionSettings loadSettings(String settingsResourceName) throws ConfigError {
-        InputStream inputStream = ObjectHelper.loadResourceAsStream(settingsResourceName);
-        if (inputStream == null) {
-            throw new IllegalArgumentException("Could not load " + settingsResourceName);
-        }
-        return new SessionSettings(inputStream);
-    }
-
-    public void start() throws Exception {
-        if (acceptor != null) {
-            acceptor.start();
-            if (jmxExporter != null) {
-                jmxExporter.register(acceptor);
-            }
-        }
-        if (initiator != null) {
-            initiator.start();
-            if (jmxExporter != null) {
-                jmxExporter.register(initiator);
-            }
-        }
-        started = true;
-    }
-
-    public void stop() throws Exception {
-        stop(forcedShutdown);
-    }
-
-    public void stop(boolean force) throws Exception {
-        if (acceptor != null) {
-            acceptor.stop();
-        }
-        if (initiator != null) {
-            initiator.stop();
-        }
-        started = false;
-    }
-
-    public boolean isStarted() {
-        return started;
-    }
-    
-    private Initiator createInitiator(Application application, SessionSettings settings, 
-            MessageStoreFactory messageStoreFactory, LogFactory sessionLogFactory, 
-            MessageFactory messageFactory, ThreadModel threadModel) throws ConfigError {
-        
-        Initiator initiator;
-        if (threadModel == ThreadModel.ThreadPerSession) {
-            initiator = new ThreadedSocketInitiator(application, messageStoreFactory, settings, sessionLogFactory, messageFactory);
-        } else if (threadModel == ThreadModel.ThreadPerConnector) {
-            initiator = new SocketInitiator(application, messageStoreFactory, settings, sessionLogFactory, messageFactory);
-        } else {
-            throw new ConfigError("Unknown thread mode: " + threadModel);
-        }
-        return initiator;
-    }
-
-    private Acceptor createAcceptor(Application application, SessionSettings settings,
-            MessageStoreFactory messageStoreFactory, LogFactory sessionLogFactory, 
-            MessageFactory messageFactory, ThreadModel threadModel) throws ConfigError {
-
-        Acceptor acceptor;
-        if (threadModel == ThreadModel.ThreadPerSession) {
-            acceptor = new ThreadedSocketAcceptor(application, messageStoreFactory, settings, sessionLogFactory, messageFactory);
-        } else if (threadModel == ThreadModel.ThreadPerConnector) {
-            acceptor = new SocketAcceptor(application, messageStoreFactory, settings, sessionLogFactory, messageFactory);
-        } else {
-            throw new ConfigError("Unknown thread mode: " + threadModel);
-        }
-        return acceptor;
-    }
-
-    private MessageStoreFactory inferMessageStoreFactory(SessionSettings settings) throws ConfigError {
-        Set<MessageStoreFactory> impliedMessageStoreFactories = new HashSet<MessageStoreFactory>();
-        isJdbcStore(settings, impliedMessageStoreFactories);
-        isFileStore(settings, impliedMessageStoreFactories);
-        isSleepycatStore(settings, impliedMessageStoreFactories);
-        if (impliedMessageStoreFactories.size() > 1) {
-            throw new ConfigError("Ambiguous message store implied in configuration.");
-        }
-        MessageStoreFactory messageStoreFactory;
-        if (impliedMessageStoreFactories.size() == 1) {
-            messageStoreFactory = (MessageStoreFactory) impliedMessageStoreFactories.iterator().next();
-        } else {
-            messageStoreFactory = new MemoryStoreFactory();
-        }
-        LOG.info("Inferring message store factory: " + messageStoreFactory.getClass().getName());
-        return messageStoreFactory;
-    }
-
-    private void isSleepycatStore(SessionSettings settings, Set<MessageStoreFactory> impliedMessageStoreFactories) {
-        if (settings.isSetting(SleepycatStoreFactory.SETTING_SLEEPYCAT_DATABASE_DIR)) {
-            impliedMessageStoreFactories.add(new SleepycatStoreFactory(settings));
-        }
-    }
-
-    private void isFileStore(SessionSettings settings, Set<MessageStoreFactory> impliedMessageStoreFactories) {
-        if (settings.isSetting(FileStoreFactory.SETTING_FILE_STORE_PATH)) {
-            impliedMessageStoreFactories.add(new FileStoreFactory(settings));
-        }
-    }
-
-    private void isJdbcStore(SessionSettings settings, Set<MessageStoreFactory> impliedMessageStoreFactories) {
-        if (settings.isSetting(JdbcSetting.SETTING_JDBC_DRIVER)) {
-            impliedMessageStoreFactories.add(new JdbcStoreFactory(settings));
-        }
-    }
-
-    private LogFactory inferLogFactory(SessionSettings settings) throws ConfigError {
-        Set<LogFactory> impliedLogFactories = new HashSet<LogFactory>();
-        isFileLog(settings, impliedLogFactories);
-        isScreenLog(settings, impliedLogFactories);
-        isSL4JLog(settings, impliedLogFactories);
-        isJdbcLog(settings, impliedLogFactories);
-        if (impliedLogFactories.size() > 1) {
-            throw new ConfigError("Ambiguous log factory implied in configuration");
-        }
-        LogFactory sessionLogFactory;
-        if (impliedLogFactories.size() == 1) {
-            sessionLogFactory = (LogFactory) impliedLogFactories.iterator().next();
-        } else {
-            // Default
-            sessionLogFactory = new ScreenLogFactory(settings);
-        }
-        LOG.info("Inferring log factory: " + sessionLogFactory.getClass().getName());
-        return sessionLogFactory;
-    }
-
-    private void isScreenLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) {
-        if (settings.isSetting(ScreenLogFactory.SETTING_LOG_EVENTS)
-                || settings.isSetting(ScreenLogFactory.SETTING_LOG_INCOMING)
-                || settings.isSetting(ScreenLogFactory.SETTING_LOG_OUTGOING)) {
-            impliedLogFactories.add(new ScreenLogFactory(settings));
-        }
-    }
-
-    private void isFileLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) {
-        if (settings.isSetting(FileLogFactory.SETTING_FILE_LOG_PATH)) {
-            impliedLogFactories.add(new FileLogFactory(settings));
-        }
-    }
-
-    private void isJdbcLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) {
-        if (settings.isSetting(JdbcSetting.SETTING_JDBC_DRIVER) && settings.isSetting(JdbcSetting.SETTING_LOG_EVENT_TABLE)) {
-            impliedLogFactories.add(new JdbcLogFactory(settings));
-        }
-    }
-
-    private void isSL4JLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) {
-        for (Object key : settings.getDefaultProperties().keySet()) {
-            if (key.toString().startsWith("SLF4J")) {
-                impliedLogFactories.add(new SLF4JLogFactory(settings));
-                return;
-            }
-        }
-    }
-
-    private boolean isConnectorRole(SessionSettings settings, String connectorRole) throws ConfigError {
-        boolean hasRole = false;
-        Iterator<SessionID> sessionIdItr = settings.sectionIterator();
-        while (sessionIdItr.hasNext()) {
-            try {
-                if (connectorRole.equals(settings.getString((SessionID) sessionIdItr.next(),
-                        SessionFactory.SETTING_CONNECTION_TYPE))) {
-                    hasRole = true;
-                    break;
-                }
-            } catch (FieldConvertError e) {
-                throw new ConfigError(e);
-            }
-        }
-        return hasRole;
-    }
-    
-    public void addEventListener(QuickfixjEventListener listener) {
-        eventListeners.add(listener);
-    }
-    
-    public void removeEventListener(QuickfixjEventListener listener) {
-        eventListeners.remove(listener);
-    }
-
-    private class Dispatcher implements Application {
-        public void fromAdmin(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
-            try {
-                dispatch(QuickfixjEventCategory.AdminMessageReceived, sessionID, message);
-            } catch (RuntimeException e) {
-                throw e;
-            } catch (Exception e) {
-                rethrowIfType(e, FieldNotFound.class);
-                rethrowIfType(e, IncorrectDataFormat.class);
-                rethrowIfType(e, IncorrectTagValue.class);
-                rethrowIfType(e, RejectLogon.class);               
-                throw new DispatcherException(e);
-            }
-        }
-        
-        public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
-            try {
-                dispatch(QuickfixjEventCategory.AppMessageReceived, sessionID, message);
-            } catch (RuntimeException e) {
-                throw e;
-            } catch (Exception e) {
-                rethrowIfType(e, FieldNotFound.class);
-                rethrowIfType(e, IncorrectDataFormat.class);
-                rethrowIfType(e, IncorrectTagValue.class);
-                rethrowIfType(e, UnsupportedMessageType.class);
-                throw new DispatcherException(e);
-            }
-        }
-
-        public void onCreate(SessionID sessionID) {
-            try {
-                dispatch(QuickfixjEventCategory.SessionCreated, sessionID, null);
-            } catch (Exception e) {
-                throw new DispatcherException(e);
-            }
-        }
-
-        public void onLogon(SessionID sessionID) {
-            try {
-                dispatch(QuickfixjEventCategory.SessionLogon, sessionID, null);
-            } catch (Exception e) {
-                throw new DispatcherException(e);
-            }
-        }
-
-        public void onLogout(SessionID sessionID) {
-            try {
-                dispatch(QuickfixjEventCategory.SessionLogoff, sessionID, null);
-            } catch (Exception e) {
-                throw new DispatcherException(e);
-            }
-        }
-
-        public void toAdmin(Message message, SessionID sessionID) {
-            try {
-                dispatch(QuickfixjEventCategory.AdminMessageSent, sessionID, message);
-            } catch (Exception e) {
-                throw new DispatcherException(e);
-            }
-        }
-
-        public void toApp(Message message, SessionID sessionID) throws DoNotSend {
-            try {
-                dispatch(QuickfixjEventCategory.AppMessageSent, sessionID, message);
-            } catch (Exception e) {
-                throw new DispatcherException(e);
-            }
-        }
-        
-        @SuppressWarnings("unchecked")
-        private <T extends Exception> void rethrowIfType(Exception e, Class<T> exceptionClass) throws T {
-            throw (T) e;
-        }
-
-        private void dispatch(QuickfixjEventCategory quickfixjEventCategory, SessionID sessionID, Message message) throws Exception {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("FIX event dispatched: {} {}", quickfixjEventCategory, message != null ? message : "");
-            }
-            for (QuickfixjEventListener listener : eventListeners) {
-                // Exceptions propagate back to the FIX engine so sequence numbers can be adjusted
-                listener.onEvent(quickfixjEventCategory, sessionID, message);
-            }
-        }
-        
-        @SuppressWarnings("serial")
-        private class DispatcherException extends RuntimeException {
-            public DispatcherException(Throwable cause) {
-                super(cause);
-            }
-        }
-    }
-        
-    public String getUri() {
-        return uri;
-    }
-
-    public MessageCorrelator getMessageCorrelator() {
-        return messageCorrelator;
-    }
-
-    // For Testing
-    Initiator getInitiator() {
-        return initiator;
-    }
-
-    // For Testing
-    Acceptor getAcceptor() {
-        return acceptor;
-    }
-
-    // For Testing
-    MessageStoreFactory getMessageStoreFactory() {
-        return messageStoreFactory;
-    }
-
-    // For Testing
-    LogFactory getLogFactory() {
-        return sessionLogFactory;
-    }
-
-    // For Testing
-    MessageFactory getMessageFactory() {
-        return messageFactory;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.management.JMException;
+
+import org.apache.camel.util.ObjectHelper;
+import org.quickfixj.jmx.JmxExporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.Acceptor;
+import quickfix.Application;
+import quickfix.ConfigError;
+import quickfix.DefaultMessageFactory;
+import quickfix.DoNotSend;
+import quickfix.FieldConvertError;
+import quickfix.FieldNotFound;
+import quickfix.FileLogFactory;
+import quickfix.FileStoreFactory;
+import quickfix.IncorrectDataFormat;
+import quickfix.IncorrectTagValue;
+import quickfix.Initiator;
+import quickfix.JdbcLogFactory;
+import quickfix.JdbcSetting;
+import quickfix.JdbcStoreFactory;
+import quickfix.LogFactory;
+import quickfix.MemoryStoreFactory;
+import quickfix.Message;
+import quickfix.MessageFactory;
+import quickfix.MessageStoreFactory;
+import quickfix.RejectLogon;
+import quickfix.SLF4JLogFactory;
+import quickfix.ScreenLogFactory;
+import quickfix.Session;
+import quickfix.SessionFactory;
+import quickfix.SessionID;
+import quickfix.SessionSettings;
+import quickfix.SleepycatStoreFactory;
+import quickfix.SocketAcceptor;
+import quickfix.SocketInitiator;
+import quickfix.ThreadedSocketAcceptor;
+import quickfix.ThreadedSocketInitiator;
+import quickfix.UnsupportedMessageType;
+
+/**
+ * This is a wrapper class that provided QuickFIX/J initialization capabilities
+ * beyond those supported in the core QuickFIX/J distribution.
+ * 
+ * Specifically, it infers dependencies on specific implementations of message
+ * stores and logs. It also supports extended QuickFIX/J settings properties to
+ * specify threading models, custom store and log implementations, etc.
+ * 
+ * The wrapper will create an initiator or acceptor or both depending on the
+ * roles of sessions described in the settings file.
+ */
+public class QuickfixjEngine {
+    public static final String DEFAULT_START_TIME = "00:00:00";
+    public static final String DEFAULT_END_TIME = "00:00:00";
+    public static final long DEFAULT_HEARTBTINT = 30;
+    public static final String SETTING_THREAD_MODEL = "ThreadModel";
+    public static final String SETTING_USE_JMX = "UseJmx";
+
+    private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEngine.class);
+
+    private final Acceptor acceptor;
+    private final Initiator initiator;
+    private final JmxExporter jmxExporter;
+    private final boolean forcedShutdown;
+    private final MessageStoreFactory messageStoreFactory;
+    private final LogFactory sessionLogFactory;
+    private final MessageFactory messageFactory;
+    private final MessageCorrelator messageCorrelator = new MessageCorrelator();
+    
+    private boolean started;
+    private List<QuickfixjEventListener> eventListeners = new CopyOnWriteArrayList<QuickfixjEventListener>();
+
+    private final String uri;
+
+    public enum ThreadModel {
+        ThreadPerConnector, ThreadPerSession;
+    }
+
+    public QuickfixjEngine(String uri, String settingsResourceName, boolean forcedShutdown)
+        throws ConfigError, FieldConvertError, IOException, JMException {
+
+        this(uri, settingsResourceName, forcedShutdown, null, null, null);
+    }
+
+    public QuickfixjEngine(String uri, String settingsResourceName, boolean forcedShutdown,
+            MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride,
+            MessageFactory messageFactoryOverride) throws ConfigError, FieldConvertError, IOException, JMException {
+        this(uri, loadSettings(settingsResourceName), forcedShutdown, messageStoreFactoryOverride,
+                sessionLogFactoryOverride, messageFactoryOverride);
+    }
+
+    public QuickfixjEngine(String uri, SessionSettings settings, boolean forcedShutdown,
+            MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride,
+            MessageFactory messageFactoryOverride) throws ConfigError, FieldConvertError, IOException, JMException {
+
+        addEventListener(messageCorrelator);
+
+        this.uri = uri;
+        this.forcedShutdown = forcedShutdown;
+        
+        messageFactory = messageFactoryOverride != null ? messageFactoryOverride : new DefaultMessageFactory();
+        sessionLogFactory = sessionLogFactoryOverride != null ? sessionLogFactoryOverride : inferLogFactory(settings);
+        messageStoreFactory = messageStoreFactoryOverride != null ? messageStoreFactoryOverride : inferMessageStoreFactory(settings);
+
+        // Set default session schedule if not specified in configuration
+        if (!settings.isSetting(Session.SETTING_START_TIME)) {
+            settings.setString(Session.SETTING_START_TIME, DEFAULT_START_TIME);
+        }
+        if (!settings.isSetting(Session.SETTING_END_TIME)) {
+            settings.setString(Session.SETTING_END_TIME, DEFAULT_END_TIME);
+        }
+        // Default heartbeat interval
+        if (!settings.isSetting(Session.SETTING_HEARTBTINT)) {
+            settings.setLong(Session.SETTING_HEARTBTINT, DEFAULT_HEARTBTINT);
+        }
+
+        // Allow specification of the QFJ threading model
+        ThreadModel threadModel = ThreadModel.ThreadPerConnector;
+        if (settings.isSetting(SETTING_THREAD_MODEL)) {
+            threadModel = ThreadModel.valueOf(settings.getString(SETTING_THREAD_MODEL));
+        }
+
+        if (settings.isSetting(SETTING_USE_JMX) && settings.getBool(SETTING_USE_JMX)) {
+            LOG.info("Enabling JMX for QuickFIX/J");
+            jmxExporter = new JmxExporter();
+        } else {
+            jmxExporter = null;
+        }
+
+        // From original component implementation...
+        // To avoid this exception in OSGi platform
+        // java.lang.NoClassDefFoundError: quickfix/fix41/MessageFactory
+        ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+            if (isConnectorRole(settings, SessionFactory.ACCEPTOR_CONNECTION_TYPE)) {
+                acceptor = createAcceptor(new Dispatcher(), settings, messageStoreFactory, 
+                    sessionLogFactory, messageFactory, threadModel);
+            } else {
+                acceptor = null;
+            }
+
+            if (isConnectorRole(settings, SessionFactory.INITIATOR_CONNECTION_TYPE)) {
+                initiator = createInitiator(new Dispatcher(), settings, messageStoreFactory, 
+                    sessionLogFactory, messageFactory, threadModel);
+            } else {
+                initiator = null;
+            }
+
+            if (acceptor == null && initiator == null) {
+                throw new ConfigError("No connector role");
+            }
+        } finally {
+            Thread.currentThread().setContextClassLoader(ccl);
+        }
+    }
+
+    private static SessionSettings loadSettings(String settingsResourceName) throws ConfigError {
+        InputStream inputStream = ObjectHelper.loadResourceAsStream(settingsResourceName);
+        if (inputStream == null) {
+            throw new IllegalArgumentException("Could not load " + settingsResourceName);
+        }
+        return new SessionSettings(inputStream);
+    }
+
+    public void start() throws Exception {
+        if (acceptor != null) {
+            acceptor.start();
+            if (jmxExporter != null) {
+                jmxExporter.register(acceptor);
+            }
+        }
+        if (initiator != null) {
+            initiator.start();
+            if (jmxExporter != null) {
+                jmxExporter.register(initiator);
+            }
+        }
+        started = true;
+    }
+
+    public void stop() throws Exception {
+        stop(forcedShutdown);
+    }
+
+    public void stop(boolean force) throws Exception {
+        if (acceptor != null) {
+            acceptor.stop();
+        }
+        if (initiator != null) {
+            initiator.stop();
+        }
+        started = false;
+    }
+
+    public boolean isStarted() {
+        return started;
+    }
+    
+    private Initiator createInitiator(Application application, SessionSettings settings, 
+            MessageStoreFactory messageStoreFactory, LogFactory sessionLogFactory, 
+            MessageFactory messageFactory, ThreadModel threadModel) throws ConfigError {
+        
+        Initiator initiator;
+        if (threadModel == ThreadModel.ThreadPerSession) {
+            initiator = new ThreadedSocketInitiator(application, messageStoreFactory, settings, sessionLogFactory, messageFactory);
+        } else if (threadModel == ThreadModel.ThreadPerConnector) {
+            initiator = new SocketInitiator(application, messageStoreFactory, settings, sessionLogFactory, messageFactory);
+        } else {
+            throw new ConfigError("Unknown thread mode: " + threadModel);
+        }
+        return initiator;
+    }
+
+    private Acceptor createAcceptor(Application application, SessionSettings settings,
+            MessageStoreFactory messageStoreFactory, LogFactory sessionLogFactory, 
+            MessageFactory messageFactory, ThreadModel threadModel) throws ConfigError {
+
+        Acceptor acceptor;
+        if (threadModel == ThreadModel.ThreadPerSession) {
+            acceptor = new ThreadedSocketAcceptor(application, messageStoreFactory, settings, sessionLogFactory, messageFactory);
+        } else if (threadModel == ThreadModel.ThreadPerConnector) {
+            acceptor = new SocketAcceptor(application, messageStoreFactory, settings, sessionLogFactory, messageFactory);
+        } else {
+            throw new ConfigError("Unknown thread mode: " + threadModel);
+        }
+        return acceptor;
+    }
+
+    private MessageStoreFactory inferMessageStoreFactory(SessionSettings settings) throws ConfigError {
+        Set<MessageStoreFactory> impliedMessageStoreFactories = new HashSet<MessageStoreFactory>();
+        isJdbcStore(settings, impliedMessageStoreFactories);
+        isFileStore(settings, impliedMessageStoreFactories);
+        isSleepycatStore(settings, impliedMessageStoreFactories);
+        if (impliedMessageStoreFactories.size() > 1) {
+            throw new ConfigError("Ambiguous message store implied in configuration.");
+        }
+        MessageStoreFactory messageStoreFactory;
+        if (impliedMessageStoreFactories.size() == 1) {
+            messageStoreFactory = (MessageStoreFactory) impliedMessageStoreFactories.iterator().next();
+        } else {
+            messageStoreFactory = new MemoryStoreFactory();
+        }
+        LOG.info("Inferring message store factory: " + messageStoreFactory.getClass().getName());
+        return messageStoreFactory;
+    }
+
+    private void isSleepycatStore(SessionSettings settings, Set<MessageStoreFactory> impliedMessageStoreFactories) {
+        if (settings.isSetting(SleepycatStoreFactory.SETTING_SLEEPYCAT_DATABASE_DIR)) {
+            impliedMessageStoreFactories.add(new SleepycatStoreFactory(settings));
+        }
+    }
+
+    private void isFileStore(SessionSettings settings, Set<MessageStoreFactory> impliedMessageStoreFactories) {
+        if (settings.isSetting(FileStoreFactory.SETTING_FILE_STORE_PATH)) {
+            impliedMessageStoreFactories.add(new FileStoreFactory(settings));
+        }
+    }
+
+    private void isJdbcStore(SessionSettings settings, Set<MessageStoreFactory> impliedMessageStoreFactories) {
+        if (settings.isSetting(JdbcSetting.SETTING_JDBC_DRIVER)) {
+            impliedMessageStoreFactories.add(new JdbcStoreFactory(settings));
+        }
+    }
+
+    private LogFactory inferLogFactory(SessionSettings settings) throws ConfigError {
+        Set<LogFactory> impliedLogFactories = new HashSet<LogFactory>();
+        isFileLog(settings, impliedLogFactories);
+        isScreenLog(settings, impliedLogFactories);
+        isSL4JLog(settings, impliedLogFactories);
+        isJdbcLog(settings, impliedLogFactories);
+        if (impliedLogFactories.size() > 1) {
+            throw new ConfigError("Ambiguous log factory implied in configuration");
+        }
+        LogFactory sessionLogFactory;
+        if (impliedLogFactories.size() == 1) {
+            sessionLogFactory = (LogFactory) impliedLogFactories.iterator().next();
+        } else {
+            // Default
+            sessionLogFactory = new ScreenLogFactory(settings);
+        }
+        LOG.info("Inferring log factory: " + sessionLogFactory.getClass().getName());
+        return sessionLogFactory;
+    }
+
+    private void isScreenLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) {
+        if (settings.isSetting(ScreenLogFactory.SETTING_LOG_EVENTS)
+                || settings.isSetting(ScreenLogFactory.SETTING_LOG_INCOMING)
+                || settings.isSetting(ScreenLogFactory.SETTING_LOG_OUTGOING)) {
+            impliedLogFactories.add(new ScreenLogFactory(settings));
+        }
+    }
+
+    private void isFileLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) {
+        if (settings.isSetting(FileLogFactory.SETTING_FILE_LOG_PATH)) {
+            impliedLogFactories.add(new FileLogFactory(settings));
+        }
+    }
+
+    private void isJdbcLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) {
+        if (settings.isSetting(JdbcSetting.SETTING_JDBC_DRIVER) && settings.isSetting(JdbcSetting.SETTING_LOG_EVENT_TABLE)) {
+            impliedLogFactories.add(new JdbcLogFactory(settings));
+        }
+    }
+
+    private void isSL4JLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) {
+        for (Object key : settings.getDefaultProperties().keySet()) {
+            if (key.toString().startsWith("SLF4J")) {
+                impliedLogFactories.add(new SLF4JLogFactory(settings));
+                return;
+            }
+        }
+    }
+
+    private boolean isConnectorRole(SessionSettings settings, String connectorRole) throws ConfigError {
+        boolean hasRole = false;
+        Iterator<SessionID> sessionIdItr = settings.sectionIterator();
+        while (sessionIdItr.hasNext()) {
+            try {
+                if (connectorRole.equals(settings.getString((SessionID) sessionIdItr.next(),
+                        SessionFactory.SETTING_CONNECTION_TYPE))) {
+                    hasRole = true;
+                    break;
+                }
+            } catch (FieldConvertError e) {
+                throw new ConfigError(e);
+            }
+        }
+        return hasRole;
+    }
+    
+    public void addEventListener(QuickfixjEventListener listener) {
+        eventListeners.add(listener);
+    }
+    
+    public void removeEventListener(QuickfixjEventListener listener) {
+        eventListeners.remove(listener);
+    }
+
+    private class Dispatcher implements Application {
+        public void fromAdmin(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
+            try {
+                dispatch(QuickfixjEventCategory.AdminMessageReceived, sessionID, message);
+            } catch (RuntimeException e) {
+                throw e;
+            } catch (Exception e) {
+                rethrowIfType(e, FieldNotFound.class);
+                rethrowIfType(e, IncorrectDataFormat.class);
+                rethrowIfType(e, IncorrectTagValue.class);
+                rethrowIfType(e, RejectLogon.class);               
+                throw new DispatcherException(e);
+            }
+        }
+        
+        public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
+            try {
+                dispatch(QuickfixjEventCategory.AppMessageReceived, sessionID, message);
+            } catch (RuntimeException e) {
+                throw e;
+            } catch (Exception e) {
+                rethrowIfType(e, FieldNotFound.class);
+                rethrowIfType(e, IncorrectDataFormat.class);
+                rethrowIfType(e, IncorrectTagValue.class);
+                rethrowIfType(e, UnsupportedMessageType.class);
+                throw new DispatcherException(e);
+            }
+        }
+
+        public void onCreate(SessionID sessionID) {
+            try {
+                dispatch(QuickfixjEventCategory.SessionCreated, sessionID, null);
+            } catch (Exception e) {
+                throw new DispatcherException(e);
+            }
+        }
+
+        public void onLogon(SessionID sessionID) {
+            try {
+                dispatch(QuickfixjEventCategory.SessionLogon, sessionID, null);
+            } catch (Exception e) {
+                throw new DispatcherException(e);
+            }
+        }
+
+        public void onLogout(SessionID sessionID) {
+            try {
+                dispatch(QuickfixjEventCategory.SessionLogoff, sessionID, null);
+            } catch (Exception e) {
+                throw new DispatcherException(e);
+            }
+        }
+
+        public void toAdmin(Message message, SessionID sessionID) {
+            try {
+                dispatch(QuickfixjEventCategory.AdminMessageSent, sessionID, message);
+            } catch (Exception e) {
+                throw new DispatcherException(e);
+            }
+        }
+
+        public void toApp(Message message, SessionID sessionID) throws DoNotSend {
+            try {
+                dispatch(QuickfixjEventCategory.AppMessageSent, sessionID, message);
+            } catch (Exception e) {
+                throw new DispatcherException(e);
+            }
+        }
+        
+        @SuppressWarnings("unchecked")
+        private <T extends Exception> void rethrowIfType(Exception e, Class<T> exceptionClass) throws T {
+            throw (T) e;
+        }
+
+        private void dispatch(QuickfixjEventCategory quickfixjEventCategory, SessionID sessionID, Message message) throws Exception {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("FIX event dispatched: {} {}", quickfixjEventCategory, message != null ? message : "");
+            }
+            for (QuickfixjEventListener listener : eventListeners) {
+                // Exceptions propagate back to the FIX engine so sequence numbers can be adjusted
+                listener.onEvent(quickfixjEventCategory, sessionID, message);
+            }
+        }
+        
+        @SuppressWarnings("serial")
+        private class DispatcherException extends RuntimeException {
+            public DispatcherException(Throwable cause) {
+                super(cause);
+            }
+        }
+    }
+        
+    public String getUri() {
+        return uri;
+    }
+
+    public MessageCorrelator getMessageCorrelator() {
+        return messageCorrelator;
+    }
+
+    // For Testing
+    Initiator getInitiator() {
+        return initiator;
+    }
+
+    // For Testing
+    Acceptor getAcceptor() {
+        return acceptor;
+    }
+
+    // For Testing
+    MessageStoreFactory getMessageStoreFactory() {
+        return messageStoreFactory;
+    }
+
+    // For Testing
+    LogFactory getLogFactory() {
+        return sessionLogFactory;
+    }
+
+    // For Testing
+    MessageFactory getMessageFactory() {
+        return messageFactory;
+    }
+}

Propchange: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventCategory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventCategory.java?rev=1209585&r1=1209584&r2=1209585&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventCategory.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventCategory.java Fri Dec  2 17:03:07 2011
@@ -1,32 +1,32 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.quickfixj;
-
-/**
- * FIX message types provided by the QuickfixjEngine. This is used in the
- * Camel exchange headers to specify the category of the message payload.
- * This allow Camel to route and filter the messages based on the category.
- */
-public enum QuickfixjEventCategory {
-    AppMessageReceived,
-    AppMessageSent,
-    AdminMessageReceived,
-    AdminMessageSent,
-    SessionCreated,
-    SessionLogon,
-    SessionLogoff
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+/**
+ * FIX message types provided by the QuickfixjEngine. This is used in the
+ * Camel exchange headers to specify the category of the message payload.
+ * This allow Camel to route and filter the messages based on the category.
+ */
+public enum QuickfixjEventCategory {
+    AppMessageReceived,
+    AppMessageSent,
+    AdminMessageReceived,
+    AdminMessageSent,
+    SessionCreated,
+    SessionLogon,
+    SessionLogoff
 }
\ No newline at end of file

Propchange: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventCategory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java?rev=1209585&r1=1209584&r2=1209585&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java Fri Dec  2 17:03:07 2011
@@ -1,28 +1,28 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.quickfixj;
-
-import quickfix.Message;
-import quickfix.SessionID;
-
-/**
- * Listens for FIX messages forwarded from the QuickfixjEngine
- * @see QuickfixjEngine
- */
-public interface QuickfixjEventListener {
-    void onEvent(QuickfixjEventCategory eventCategory, SessionID sessionID, Message message) throws Exception;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import quickfix.Message;
+import quickfix.SessionID;
+
+/**
+ * Listens for FIX messages forwarded from the QuickfixjEngine
+ * @see QuickfixjEngine
+ */
+public interface QuickfixjEventListener {
+    void onEvent(QuickfixjEventCategory eventCategory, SessionID sessionID, Message message) throws Exception;
+}

Propchange: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java?rev=1209585&r1=1209584&r2=1209585&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java Fri Dec  2 17:03:07 2011
@@ -1,82 +1,82 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.quickfixj;
-
-import java.util.concurrent.Callable;
-
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultProducer;
-
-import quickfix.Message;
-import quickfix.MessageUtils;
-import quickfix.Session;
-import quickfix.SessionID;
-
-public class QuickfixjProducer extends DefaultProducer {
-    public static final String CORRELATION_TIMEOUT_KEY = "CorrelationTimeout";
-    public static final String CORRELATION_CRITERIA_KEY = "CorrelationCriteria";
-    
-    private final SessionID sessionID;
-    
-    public QuickfixjProducer(Endpoint endpoint) {
-        super(endpoint);
-        sessionID = ((QuickfixjEndpoint) getEndpoint()).getSessionID();
-    }
-
-    public void process(Exchange exchange) throws Exception {
-        sendMessage(exchange, exchange.getIn());
-    }
-
-    void sendMessage(Exchange exchange, org.apache.camel.Message camelMessage) throws InterruptedException {
-        try {
-            Message message = camelMessage.getBody(Message.class);
-            log.debug("Sending FIX message: {}", message);
-
-            SessionID messageSessionID = sessionID;
-            if (messageSessionID == null) {
-                messageSessionID = MessageUtils.getSessionID(message);
-            }
-
-            Session session = getSession(messageSessionID);
-            if (session == null) {
-                throw new IllegalStateException("Unknown session: " + messageSessionID);
-            }
-
-            Callable<Message> callable = null;
-
-            if (exchange.getPattern().isOutCapable()) {
-                QuickfixjEndpoint endpoint = (QuickfixjEndpoint) getEndpoint();
-                MessageCorrelator messageCorrelator = endpoint.getEngine().getMessageCorrelator();
-                callable = messageCorrelator.getReply(endpoint.getSessionID(), exchange);
-            }
-
-            session.send(message);
-
-            if (callable != null) {
-                Message reply = callable.call();
-                exchange.getOut().setBody(reply);
-            }
-        } catch (Exception e) {
-            exchange.setException(e);
-        }
-    }
-
-    Session getSession(SessionID messageSessionID) {
-        return Session.lookupSession(messageSessionID);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.util.concurrent.Callable;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+
+import quickfix.Message;
+import quickfix.MessageUtils;
+import quickfix.Session;
+import quickfix.SessionID;
+
+public class QuickfixjProducer extends DefaultProducer {
+    public static final String CORRELATION_TIMEOUT_KEY = "CorrelationTimeout";
+    public static final String CORRELATION_CRITERIA_KEY = "CorrelationCriteria";
+    
+    private final SessionID sessionID;
+    
+    public QuickfixjProducer(Endpoint endpoint) {
+        super(endpoint);
+        sessionID = ((QuickfixjEndpoint) getEndpoint()).getSessionID();
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        sendMessage(exchange, exchange.getIn());
+    }
+
+    void sendMessage(Exchange exchange, org.apache.camel.Message camelMessage) throws InterruptedException {
+        try {
+            Message message = camelMessage.getBody(Message.class);
+            log.debug("Sending FIX message: {}", message);
+
+            SessionID messageSessionID = sessionID;
+            if (messageSessionID == null) {
+                messageSessionID = MessageUtils.getSessionID(message);
+            }
+
+            Session session = getSession(messageSessionID);
+            if (session == null) {
+                throw new IllegalStateException("Unknown session: " + messageSessionID);
+            }
+
+            Callable<Message> callable = null;
+
+            if (exchange.getPattern().isOutCapable()) {
+                QuickfixjEndpoint endpoint = (QuickfixjEndpoint) getEndpoint();
+                MessageCorrelator messageCorrelator = endpoint.getEngine().getMessageCorrelator();
+                callable = messageCorrelator.getReply(endpoint.getSessionID(), exchange);
+            }
+
+            session.send(message);
+
+            if (callable != null) {
+                Message reply = callable.call();
+                exchange.getOut().setBody(reply);
+            }
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+    }
+
+    Session getSession(SessionID messageSessionID) {
+        return Session.lookupSession(messageSessionID);
+    }
+}

Propchange: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java
------------------------------------------------------------------------------
    svn:eol-style = native