You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2010/09/28 23:27:00 UTC

svn commit: r1002361 [1/3] - in /camel/trunk/components/camel-quickfix: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/camel/ src/main/java/org/apache/camel/component/ src/main/java/org/apache/cam...

Author: hadrian
Date: Tue Sep 28 21:26:58 2010
New Revision: 1002361

URL: http://svn.apache.org/viewvc?rev=1002361&view=rev
Log:
CAMEL-1350. Replace camel-quickfix with complete rewrite

Added:
    camel/trunk/components/camel-quickfix/
    camel/trunk/components/camel-quickfix/pom.xml
    camel/trunk/components/camel-quickfix/src/
    camel/trunk/components/camel-quickfix/src/main/
    camel/trunk/components/camel-quickfix/src/main/java/
    camel/trunk/components/camel-quickfix/src/main/java/org/
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventCategory.java
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/
    camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java
    camel/trunk/components/camel-quickfix/src/main/resources/
    camel/trunk/components/camel-quickfix/src/main/resources/META-INF/
    camel/trunk/components/camel-quickfix/src/main/resources/META-INF/LICENSE.txt
    camel/trunk/components/camel-quickfix/src/main/resources/META-INF/NOTICE.txt
    camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/
    camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/
    camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/
    camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/camel/
    camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
    camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/camel/component/
    camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/camel/component/quickfixj
    camel/trunk/components/camel-quickfix/src/test/
    camel/trunk/components/camel-quickfix/src/test/java/
    camel/trunk/components/camel-quickfix/src/test/java/org/
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/routing/
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorComponent.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjEventJsonTransformer.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonPrinter.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/util/
    camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/util/CountDownLatchDecrementer.java
    camel/trunk/components/camel-quickfix/src/test/resources/
    camel/trunk/components/camel-quickfix/src/test/resources/examples/
    camel/trunk/components/camel-quickfix/src/test/resources/examples/gateway.cfg
    camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg
    camel/trunk/components/camel-quickfix/src/test/resources/log4j.properties

Added: camel/trunk/components/camel-quickfix/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/pom.xml?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/pom.xml (added)
+++ camel/trunk/components/camel-quickfix/pom.xml Tue Sep 28 21:26:58 2010
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.camel</groupId>
+		<artifactId>camel-parent</artifactId>
+		<version>2.5-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>camel-quickfixj</artifactId>
+	<packaging>bundle</packaging>
+	<name>Camel :: QuickFIX/J</name>
+	<description>Camel QuickFIX/J support</description>
+
+	<properties>
+		<camel.osgi.export.pkg>
+			org.apache.camel.component.quickfixj.*
+        </camel.osgi.export.pkg>
+	</properties>
+
+	<repositories>
+		<repository>
+			<id>open.iona.m2-all</id>
+			<name>Fuse Source Maven Repo</name>
+			<url>http://repo.fusesource.com/maven2-all</url>
+		</repository>
+		<repository>
+			<id>apache.snapshots</id>
+			<name>Apache Snapshots</name>
+			<url>https://repository.apache.org/content/repositories/snapshots/</url>
+		</repository>
+	</repositories>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.camel</groupId>
+			<artifactId>camel-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.camel</groupId>
+			<artifactId>camel-spring</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.quickfixj</groupId>
+			<artifactId>quickfixj-all</artifactId>
+			<version>1.5.0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.mina</groupId>
+			<artifactId>mina-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>commons-logging</groupId>
+			<artifactId>commons-logging</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.mockito</groupId>
+			<artifactId>mockito-core</artifactId>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+</project>

Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java (added)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QuickfixjComponent extends DefaultComponent {
+    private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class);
+
+    private final Object engineInstancesLock = new Object();
+    private final Map<String, QuickfixjEngine> engines = new HashMap<String, QuickfixjEngine>();
+    private final Map<String, QuickfixjEndpoint> endpoints = new HashMap<String, QuickfixjEndpoint>();
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        // Look up the engine instance based on the settings file ("remaining")
+        QuickfixjEngine engine;
+        synchronized (engineInstancesLock) {
+            QuickfixjEndpoint endpoint = endpoints.get(uri);
+
+            if (endpoint == null) {
+                engine = engines.get(remaining);
+                if (engine == null) {
+                    LOG.info("Creating QuickFIX/J engine using settings: " + remaining);
+                    engine = new QuickfixjEngine(remaining, false);
+                    engines.put(remaining, engine);
+                    if (isStarted()) {
+                        startQuickfixjEngine(engine);
+                    }
+                }
+                
+                endpoint = new QuickfixjEndpoint(uri, getCamelContext());
+                engine.addEventListener(endpoint);
+                endpoints.put(uri, endpoint);
+            }
+            
+            return endpoint;
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        LOG.info("QuickFIX/J component started");
+        synchronized (engineInstancesLock) {
+            for (QuickfixjEngine engine : engines.values()) {
+                startQuickfixjEngine(engine);            
+            }
+        }
+    }
+
+    private void startQuickfixjEngine(QuickfixjEngine engine) throws Exception {
+        LOG.info("Starting QuickFIX/J engine: " + engine.getSettingsResourceName());
+        engine.start();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        synchronized (engineInstancesLock) {
+            for (QuickfixjEngine engine : engines.values()) {
+                engine.stop();           
+            }
+        }
+        LOG.info("QuickFIX/J component stopped");
+    }
+
+    // Test Support
+    Map<String, QuickfixjEngine> getEngines() {
+        return Collections.unmodifiableMap(engines);
+    }
+}

Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java (added)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.quickfixj.converter.QuickfixjConverters;
+import org.apache.camel.impl.DefaultConsumer;
+
+import quickfix.Message;
+import quickfix.SessionID;
+
+public class QuickfixjConsumer extends DefaultConsumer {
+
+    public QuickfixjConsumer(Endpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    public void onEvent(QuickfixjEventCategory eventCategory, SessionID sessionID, Message message) {
+        if (isStarted()) {
+            Exchange exchange = QuickfixjConverters.toExchange(getEndpoint(), sessionID, message, eventCategory);
+            try {
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+        }
+    }
+}

Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java (added)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.MultipleConsumersSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.Message;
+import quickfix.SessionID;
+
+public class QuickfixjEndpoint extends DefaultEndpoint implements QuickfixjEventListener, MultipleConsumersSupport {
+    public static final String EVENT_CATEGORY_KEY = "EventCategory";
+    public static final String SESSION_ID_KEY = "SessionID";
+    public static final String MESSAGE_TYPE_KEY = "MessageType";
+    public static final String DATA_DICTIONARY_KEY = "DataDictionary";
+    
+    private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEndpoint.class);
+    
+    private SessionID sessionID;
+    private List<QuickfixjConsumer> consumers = new CopyOnWriteArrayList<QuickfixjConsumer>();
+    
+    public QuickfixjEndpoint(String uri, CamelContext context) {
+        super(uri, context);
+    }
+
+    protected SessionID getSessionID() {
+        return sessionID;
+    }
+
+    public void setSessionID(SessionID sessionID) {
+        this.sessionID = sessionID;
+    }
+    
+    public Consumer createConsumer(Processor processor) throws Exception {
+        LOG.info("Creating QuickFIX/J consumer: " + (sessionID != null ? sessionID : "No Session"));
+        QuickfixjConsumer consumer = new QuickfixjConsumer(this, processor);
+        // TODO It's not clear how the consumer lifecycle is managed
+        consumer.start();
+        consumers.add(consumer);
+        return consumer;
+    }
+
+    public Producer createProducer() throws Exception {
+        LOG.info("Creating QuickFIX/J producer: " + (sessionID != null ? sessionID : "No Session"));
+        return new QuickfixjProducer(this);
+    }
+
+    public boolean isSingleton() {
+        // TODO This seems to be incorrect. There can be multiple consumers for a session endpoint.
+        return true;
+    }
+
+    public void onEvent(QuickfixjEventCategory eventCategory, SessionID sessionID, Message message) {
+        if (this.sessionID == null || this.sessionID.equals(sessionID)) {
+            for (QuickfixjConsumer consumer : consumers) {
+                consumer.onEvent(eventCategory, sessionID, message);
+            }
+        }
+    }
+    
+    public boolean isMultipleConsumersSupported() {
+        return true;
+    }
+}

Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java (added)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.management.JMException;
+
+import org.apache.camel.util.ObjectHelper;
+import org.quickfixj.jmx.JmxExporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.Acceptor;
+import quickfix.Application;
+import quickfix.ConfigError;
+import quickfix.DefaultMessageFactory;
+import quickfix.DoNotSend;
+import quickfix.FieldConvertError;
+import quickfix.FieldNotFound;
+import quickfix.FileLogFactory;
+import quickfix.FileStoreFactory;
+import quickfix.IncorrectDataFormat;
+import quickfix.IncorrectTagValue;
+import quickfix.Initiator;
+import quickfix.JdbcLogFactory;
+import quickfix.JdbcSetting;
+import quickfix.JdbcStoreFactory;
+import quickfix.LogFactory;
+import quickfix.MemoryStoreFactory;
+import quickfix.Message;
+import quickfix.MessageFactory;
+import quickfix.MessageStoreFactory;
+import quickfix.RejectLogon;
+import quickfix.ScreenLogFactory;
+import quickfix.Session;
+import quickfix.SessionFactory;
+import quickfix.SessionID;
+import quickfix.SessionSettings;
+import quickfix.SleepycatStoreFactory;
+import quickfix.SocketAcceptor;
+import quickfix.SocketInitiator;
+import quickfix.ThreadedSocketAcceptor;
+import quickfix.ThreadedSocketInitiator;
+import quickfix.UnsupportedMessageType;
+
+/**
+ * This is a wrapper class that provided QuickFIX/J initialization capabilities
+ * beyond those supported in the core QuickFIX/J distribution.
+ * 
+ * Specifically, it infers dependencies on specific implementations of message
+ * stores and logs. It also supports extended QuickFIX/J settings properties to
+ * specify threading models, custom store and log implementations, etc.
+ * 
+ * The wrapper will create an initiator or acceptor or both depending on the
+ * roles of sessions described in the settings file.
+ */
+public class QuickfixjEngine {
+    public static final String DEFAULT_START_TIME = "00:00:00";
+    public static final String DEFAULT_END_TIME = "00:00:00";
+    public static final long DEFAULT_HEARTBTINT = 30;
+    public static final String SETTING_THREAD_MODEL = "ThreadModel";
+    public static final String SETTING_USE_JMX = "UseJmx";
+
+    private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEngine.class);
+
+    private final Acceptor acceptor;
+    private final Initiator initiator;
+    private final boolean forcedShutdown;
+    private final MessageStoreFactory messageStoreFactory;
+    private final LogFactory sessionLogFactory;
+    private final MessageFactory messageFactory;
+
+    private boolean started;
+    private String settingsResourceName;
+    private List<QuickfixjEventListener> eventListeners = new CopyOnWriteArrayList<QuickfixjEventListener>();
+
+    public enum ThreadModel {
+        ThreadPerConnector, ThreadPerSession;
+    }
+
+    public QuickfixjEngine(String settingsResourceName, boolean forcedShutdown) throws ConfigError, FieldConvertError, IOException, JMException {
+        this.forcedShutdown = forcedShutdown;
+        this.settingsResourceName = settingsResourceName;
+
+        InputStream inputStream = ObjectHelper.loadResourceAsStream(settingsResourceName);
+        if (inputStream == null) {
+            throw new IllegalArgumentException("Could not load " + settingsResourceName);
+        }
+        
+        SessionSettings settings = new SessionSettings(inputStream);
+
+        // TODO Make the message factory configurable for advanced users
+        messageFactory = new DefaultMessageFactory();
+        sessionLogFactory = inferLogFactory(settings);
+        messageStoreFactory = inferMessageStoreFactory(settings);
+
+        // Set default session schedule if not specified in configuration        
+        if (!settings.isSetting(Session.SETTING_START_TIME)) {
+            settings.setString(Session.SETTING_START_TIME, DEFAULT_START_TIME);
+        }
+        if (!settings.isSetting(Session.SETTING_END_TIME)) {
+            settings.setString(Session.SETTING_END_TIME, DEFAULT_END_TIME);
+        }
+        // Default heartbeat interval
+        if (!settings.isSetting(Session.SETTING_HEARTBTINT)) {
+            settings.setLong(Session.SETTING_HEARTBTINT, DEFAULT_HEARTBTINT);
+        }
+        
+        // Allow specification of the QFJ threading model
+        ThreadModel threadModel = ThreadModel.ThreadPerConnector;
+        if (settings.isSetting(SETTING_THREAD_MODEL)) {
+            threadModel = ThreadModel.valueOf(settings.getString(SETTING_THREAD_MODEL));
+        }
+
+        JmxExporter jmxExporter = null;
+        if (settings.isSetting(SETTING_USE_JMX) && settings.getBool(SETTING_USE_JMX)) {
+            LOG.info("Enabling JMX for QuickFIX/J");
+            jmxExporter = new JmxExporter();
+        }
+        
+        // From original component implementation...
+        // To avoid this exception in OSGi platform
+        // java.lang.NoClassDefFoundError: quickfix/fix41/MessageFactory
+        ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+            if (isConnectorRole(settings, SessionFactory.ACCEPTOR_CONNECTION_TYPE)) {
+                acceptor = createAcceptor(new Dispatcher(), settings, 
+                    messageStoreFactory, sessionLogFactory, messageFactory, threadModel);
+
+                if (jmxExporter != null) {
+                    jmxExporter.export(acceptor);
+                }
+            } else {
+                acceptor = null;
+            }
+    
+            if (isConnectorRole(settings, SessionFactory.INITIATOR_CONNECTION_TYPE)) {
+                initiator = createInitiator(new Dispatcher(), settings, 
+                    messageStoreFactory, sessionLogFactory, messageFactory, threadModel);
+                
+                if (jmxExporter != null) {
+                    jmxExporter.export(initiator);
+                }
+            } else {
+                initiator = null;
+            }
+    
+            if (acceptor == null && initiator == null) {
+                throw new ConfigError("No connector role");
+            }
+        } finally {
+            Thread.currentThread().setContextClassLoader(ccl);
+        }
+    }
+
+    public void start() throws Exception {
+        if (acceptor != null) {
+            acceptor.start();
+        }
+        if (initiator != null) {
+            initiator.start();
+        }
+        started = true;
+    }
+
+    public void stop() throws Exception {
+        stop(forcedShutdown);
+    }
+
+    public void stop(boolean force) throws Exception {
+        if (acceptor != null) {
+            acceptor.stop();
+        }
+        if (initiator != null) {
+            initiator.stop();
+        }
+        started = false;
+    }
+
+    public boolean isStarted() {
+        return started;
+    }
+    
+    private Initiator createInitiator(Application application, SessionSettings settings, 
+            MessageStoreFactory messageStoreFactory, LogFactory sessionLogFactory, 
+            MessageFactory messageFactory, ThreadModel threadModel) throws ConfigError {
+        
+        Initiator initiator;
+        if (threadModel == ThreadModel.ThreadPerSession) {
+            initiator = new ThreadedSocketInitiator(application, messageStoreFactory, settings, sessionLogFactory, messageFactory);
+        } else if (threadModel == ThreadModel.ThreadPerConnector) {
+            initiator = new SocketInitiator(application, messageStoreFactory, settings, sessionLogFactory, messageFactory);
+        } else {
+            throw new ConfigError("Unknown thread mode: " + threadModel);
+        }
+        return initiator;
+    }
+
+    private Acceptor createAcceptor(Application application, SessionSettings settings,
+            MessageStoreFactory messageStoreFactory, LogFactory sessionLogFactory, 
+            MessageFactory messageFactory, ThreadModel threadModel) throws ConfigError {
+
+        Acceptor acceptor;
+        if (threadModel == ThreadModel.ThreadPerSession) {
+            acceptor = new ThreadedSocketAcceptor(application, messageStoreFactory, settings, sessionLogFactory, messageFactory);
+        } else if (threadModel == ThreadModel.ThreadPerConnector) {
+            acceptor = new SocketAcceptor(application, messageStoreFactory, settings, sessionLogFactory, messageFactory);
+        } else {
+            throw new ConfigError("Unknown thread mode: " + threadModel);
+        }
+        return acceptor;
+    }
+
+    private MessageStoreFactory inferMessageStoreFactory(SessionSettings settings) throws ConfigError {
+        Set<MessageStoreFactory> impliedMessageStoreFactories = new HashSet<MessageStoreFactory>();
+        isJdbcStore(settings, impliedMessageStoreFactories);
+        isFileStore(settings, impliedMessageStoreFactories);
+        isSleepycatStore(settings, impliedMessageStoreFactories);
+        if (impliedMessageStoreFactories.size() > 1) {
+            throw new ConfigError("Ambiguous message store implied in configuration.");
+        }
+        MessageStoreFactory messageStoreFactory;
+        if (impliedMessageStoreFactories.size() == 1) {
+            messageStoreFactory = (MessageStoreFactory) impliedMessageStoreFactories.iterator().next();
+        } else {
+            messageStoreFactory = new MemoryStoreFactory();
+        }
+        LOG.info("Inferring message store factory: " + messageStoreFactory.getClass().getName());
+        return messageStoreFactory;
+    }
+
+    private void isSleepycatStore(SessionSettings settings, Set<MessageStoreFactory> impliedMessageStoreFactories) {
+        if (settings.isSetting(SleepycatStoreFactory.SETTING_SLEEPYCAT_DATABASE_DIR)) {
+            impliedMessageStoreFactories.add(new SleepycatStoreFactory(settings));
+        }
+    }
+
+    private void isFileStore(SessionSettings settings, Set<MessageStoreFactory> impliedMessageStoreFactories) {
+        if (settings.isSetting(FileStoreFactory.SETTING_FILE_STORE_PATH)) {
+            impliedMessageStoreFactories.add(new FileStoreFactory(settings));
+        }
+    }
+
+    private void isJdbcStore(SessionSettings settings, Set<MessageStoreFactory> impliedMessageStoreFactories) {
+        if (settings.isSetting(JdbcSetting.SETTING_JDBC_DRIVER)) {
+            impliedMessageStoreFactories.add(new JdbcStoreFactory(settings));
+        }
+    }
+
+    private LogFactory inferLogFactory(SessionSettings settings) throws ConfigError {
+        Set<LogFactory> impliedLogFactories = new HashSet<LogFactory>();
+        isFileLog(settings, impliedLogFactories);
+        isScreenLog(settings, impliedLogFactories);
+        isJdbcLog(settings, impliedLogFactories);
+        if (impliedLogFactories.size() > 1) {
+            throw new ConfigError("Ambiguous log factory implied in configuration");
+        }
+        LogFactory sessionLogFactory;
+        if (impliedLogFactories.size() == 1) {
+            sessionLogFactory = (LogFactory) impliedLogFactories.iterator().next();
+        } else {
+            // Default
+            sessionLogFactory = new ScreenLogFactory(settings);
+        }
+        LOG.info("Inferring log factory: " + sessionLogFactory.getClass().getName());
+        return sessionLogFactory;
+    }
+
+    private void isScreenLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) {
+        if (settings.isSetting(ScreenLogFactory.SETTING_LOG_EVENTS)
+                || settings.isSetting(ScreenLogFactory.SETTING_LOG_INCOMING)
+                || settings.isSetting(ScreenLogFactory.SETTING_LOG_OUTGOING)) {
+            impliedLogFactories.add(new ScreenLogFactory(settings));
+        }
+    }
+
+    private void isFileLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) {
+        if (settings.isSetting(FileLogFactory.SETTING_FILE_LOG_PATH)) {
+            impliedLogFactories.add(new FileLogFactory(settings));
+        }
+    }
+
+    private void isJdbcLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) {
+        if (impliedLogFactories.size() == 0 && settings.isSetting(JdbcSetting.SETTING_JDBC_DRIVER)) {
+            impliedLogFactories.add(new JdbcLogFactory(settings));
+        }
+    }
+
+    private boolean isConnectorRole(SessionSettings settings, String connectorRole) throws ConfigError {
+        boolean hasRole = false;
+        Iterator<SessionID> sessionIdItr = settings.sectionIterator();
+        while (sessionIdItr.hasNext()) {
+            try {
+                if (connectorRole.equals(settings.getString((SessionID) sessionIdItr.next(),
+                        SessionFactory.SETTING_CONNECTION_TYPE))) {
+                    hasRole = true;
+                    break;
+                }
+            } catch (FieldConvertError e) {
+                throw new ConfigError(e);
+            }
+        }
+        return hasRole;
+    }
+    
+    public void addEventListener(QuickfixjEventListener listener) {
+        eventListeners.add(listener);
+    }
+    
+    public void removeEventListener(QuickfixjEventListener listener) {
+        eventListeners.remove(listener);
+    }
+
+    private class Dispatcher implements Application {
+        public void fromAdmin(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
+            dispatch(QuickfixjEventCategory.AdminMessageReceived, sessionID, message);
+        }
+
+        public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
+            dispatch(QuickfixjEventCategory.AppMessageReceived, sessionID, message);
+        }
+
+        public void onCreate(SessionID sessionID) {
+            dispatch(QuickfixjEventCategory.SessionCreated, sessionID, null);
+        }
+
+        public void onLogon(SessionID sessionID) {
+            dispatch(QuickfixjEventCategory.SessionLogon, sessionID, null);
+        }
+
+        public void onLogout(SessionID sessionID) {
+            dispatch(QuickfixjEventCategory.SessionLogoff, sessionID, null);
+        }
+
+        public void toAdmin(Message message, SessionID sessionID) {
+            dispatch(QuickfixjEventCategory.AdminMessageSent, sessionID, message);
+        }
+
+        public void toApp(Message message, SessionID sessionID) throws DoNotSend {
+            dispatch(QuickfixjEventCategory.AppMessageSent, sessionID, message);
+        }
+        
+        private void dispatch(QuickfixjEventCategory quickfixjEventCategory, SessionID sessionID, Message message) {
+            // TODO Find a way to propagate exception to the QFJ engine (RejectLogon, DoNotSend, etc.)
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("FIX event dispatched: " + quickfixjEventCategory + " " + (message != null ? message : ""));
+            }
+            for (QuickfixjEventListener listener : eventListeners) {
+                try {
+                    listener.onEvent(quickfixjEventCategory, sessionID, message);
+                } catch (Exception e) {
+                    LOG.error("Error during event dispatching", e);
+                }
+            }
+        }
+    }
+    
+    public String getSettingsResourceName() {
+        return settingsResourceName;
+    }
+
+    // For Testing
+    Initiator getInitiator() {
+        return initiator;
+    }
+
+    // For Testing
+    Acceptor getAcceptor() {
+        return acceptor;
+    }
+
+    // For Testing
+    MessageStoreFactory getMessageStoreFactory() {
+        return messageStoreFactory;
+    }
+
+    // For Testing
+    LogFactory getLogFactory() {
+        return sessionLogFactory;
+    }
+
+    // For Testing
+    MessageFactory getMessageFactory() {
+        return messageFactory;
+    }
+}

Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventCategory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventCategory.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventCategory.java (added)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventCategory.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+/**
+ * FIX message types provided by the QuickfixjEngine. This is used in the
+ * Camel exchange headers to specify the category of the message payload.
+ * This allow Camel to route and filter the messages based on the category.
+ */
+public enum QuickfixjEventCategory {
+    AppMessageReceived,
+    AppMessageSent,
+    AdminMessageReceived,
+    AdminMessageSent,
+    SessionCreated,
+    SessionLogon,
+    SessionLogoff
+}
\ No newline at end of file

Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java (added)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import quickfix.Message;
+import quickfix.SessionID;
+
+/**
+ * Listens for FIX messages forwarded from the QuickfixjEngine
+ * @see QuickfixjEngine
+ */
+public interface QuickfixjEventListener {
+    
+    void onEvent(QuickfixjEventCategory eventCategory, SessionID sessionID, Message message);
+    
+}

Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java (added)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+
+import quickfix.Message;
+import quickfix.MessageUtils;
+import quickfix.Session;
+import quickfix.SessionID;
+
+public class QuickfixjProducer extends DefaultProducer {
+    private final SessionID sessionID;
+
+    public QuickfixjProducer(Endpoint endpoint) {
+        super(endpoint);
+        sessionID = ((QuickfixjEndpoint) getEndpoint()).getSessionID();
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        Message message = exchange.getIn().getBody(Message.class);
+        if (log.isDebugEnabled()) {
+            log.debug("Sending FIX message: " + message.toString());
+        }
+        
+        SessionID messageSessionID = sessionID;
+        if (messageSessionID == null) {
+            messageSessionID = MessageUtils.getSessionID(message);
+        }
+        
+        Session session = Session.lookupSession(messageSessionID);
+        if (session == null) {
+            exchange.setException(new IllegalStateException("Unknown session: " + messageSessionID));
+            return;
+        }
+        
+        session.send(message);
+    }
+
+}

Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java (added)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.converter;
+
+import org.apache.camel.Converter;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.component.quickfixj.QuickfixjEndpoint;
+import org.apache.camel.component.quickfixj.QuickfixjEventCategory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.ConfigError;
+import quickfix.DataDictionary;
+import quickfix.FieldNotFound;
+import quickfix.InvalidMessage;
+import quickfix.Message;
+import quickfix.Session;
+import quickfix.SessionID;
+import quickfix.field.MsgType;
+
+import static org.apache.camel.component.quickfixj.QuickfixjEndpoint.EVENT_CATEGORY_KEY;
+import static org.apache.camel.component.quickfixj.QuickfixjEndpoint.MESSAGE_TYPE_KEY;
+import static org.apache.camel.component.quickfixj.QuickfixjEndpoint.SESSION_ID_KEY;
+
+@Converter
+public final class QuickfixjConverters {
+    private static final Logger LOG = LoggerFactory.getLogger(QuickfixjConverters.class);
+
+    private QuickfixjConverters() {
+        //Utility class
+    }
+
+    @Converter
+    public static SessionID toSessionID(String sessionID) {
+        return new SessionID(sessionID);
+    }
+    
+    @Converter
+    public static Message toMessage(String value, Exchange exchange) throws InvalidMessage, ConfigError {
+        DataDictionary dataDictionary = getDataDictionary(exchange);
+        return new Message(value, dataDictionary);
+    }
+
+    private static DataDictionary getDataDictionary(Exchange exchange) throws ConfigError {
+        DataDictionary dataDictionary = null;
+        
+        Object dictionaryValue = exchange.getProperties().get(QuickfixjEndpoint.DATA_DICTIONARY_KEY);
+        
+        if (dictionaryValue instanceof DataDictionary) {
+            dataDictionary = (DataDictionary)dictionaryValue;
+        }
+        else if (dictionaryValue instanceof String) {
+            dataDictionary = new DataDictionary((String) dictionaryValue);
+        }      
+        else {
+            SessionID sessionID = (SessionID) exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY);
+            Session session = Session.lookupSession(sessionID);
+            dataDictionary = session != null ? session.getDataDictionary() : null;
+        }
+        
+        return dataDictionary;
+    }
+    
+    public static Exchange toExchange(Endpoint endpoint, SessionID sessionID, Message message, QuickfixjEventCategory eventCategory) {
+        Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly);
+        
+        org.apache.camel.Message camelMessage = exchange.getIn();
+        camelMessage.setHeader(EVENT_CATEGORY_KEY, eventCategory);
+        camelMessage.setHeader(SESSION_ID_KEY, sessionID);
+        
+        if (message != null) {
+            try {
+                camelMessage.setHeader(MESSAGE_TYPE_KEY, message.getHeader().getString(MsgType.FIELD));
+            } catch (FieldNotFound fieldNotFoundEx) {
+                LOG.error("Message type field not found in QFJ message, continuing...");
+            }
+        }
+        
+        camelMessage.setBody(message); 
+        
+        return exchange;
+    }
+}

Added: camel/trunk/components/camel-quickfix/src/main/resources/META-INF/LICENSE.txt
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/resources/META-INF/LICENSE.txt?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/resources/META-INF/LICENSE.txt (added)
+++ camel/trunk/components/camel-quickfix/src/main/resources/META-INF/LICENSE.txt Tue Sep 28 21:26:58 2010
@@ -0,0 +1,203 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+

Added: camel/trunk/components/camel-quickfix/src/main/resources/META-INF/NOTICE.txt
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/resources/META-INF/NOTICE.txt?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/resources/META-INF/NOTICE.txt (added)
+++ camel/trunk/components/camel-quickfix/src/main/resources/META-INF/NOTICE.txt Tue Sep 28 21:26:58 2010
@@ -0,0 +1,11 @@
+   =========================================================================
+   ==  NOTICE file corresponding to the section 4 d of                    ==
+   ==  the Apache License, Version 2.0,                                   ==
+   ==  in this case for the Apache Camel distribution.                    ==
+   =========================================================================
+
+   This product includes software developed by
+   The Apache Software Foundation (http://www.apache.org/).
+
+   Please read the different LICENSE files present in the licenses directory of
+   this distribution.

Added: camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/camel/TypeConverter?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/camel/TypeConverter (added)
+++ camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/camel/TypeConverter Tue Sep 28 21:26:58 2010
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.camel.component.quickfixj.converter
\ No newline at end of file

Added: camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/camel/component/quickfixj
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/camel/component/quickfixj?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/camel/component/quickfixj (added)
+++ camel/trunk/components/camel-quickfix/src/main/resources/META-INF/services/org/apache/camel/component/quickfixj Tue Sep 28 21:26:58 2010
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.quickfixj.QuickfixjComponent

Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.MultipleConsumersSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.quickfixj.converter.QuickfixjConverters;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.StaticMethodTypeConverter;
+import org.apache.mina.common.TransportType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import quickfix.Acceptor;
+import quickfix.FixVersions;
+import quickfix.Initiator;
+import quickfix.Session;
+import quickfix.SessionFactory;
+import quickfix.SessionID;
+import quickfix.SessionSettings;
+import quickfix.field.EmailThreadID;
+import quickfix.field.EmailType;
+import quickfix.field.SenderCompID;
+import quickfix.field.Subject;
+import quickfix.field.TargetCompID;
+import quickfix.fix44.Email;
+
+
+public class QuickfixjComponentTest {
+    private File settingsFile;
+    private File tempdir;
+    private ClassLoader contextClassLoader;
+    private SessionID sessionID;
+    private SessionSettings settings;
+    private QuickfixjComponent component;
+
+    private void setSessionID(SessionSettings sessionSettings, SessionID sessionID) {
+        sessionSettings.setString(sessionID, SessionSettings.BEGINSTRING, sessionID.getBeginString());
+        sessionSettings.setString(sessionID, SessionSettings.SENDERCOMPID, sessionID.getSenderCompID());
+        sessionSettings.setString(sessionID, SessionSettings.TARGETCOMPID, sessionID.getTargetCompID());
+    }
+    
+    private String getEndpointUri(final String configFilename, SessionID sid) {
+        String uri = "quickfixj:" + configFilename;
+        if (sid != null) {
+            uri += "?sessionID=" + sid;
+        }
+        return uri;
+    }
+    @Before
+    public void setUp() throws Exception {
+        settingsFile = File.createTempFile("quickfixj_test_", ".cfg");
+        tempdir = settingsFile.getParentFile();
+        URL[] urls = new URL[] {tempdir.toURI().toURL()};
+       
+        contextClassLoader = Thread.currentThread().getContextClassLoader();
+        ClassLoader testClassLoader = new URLClassLoader(urls, contextClassLoader);
+        Thread.currentThread().setContextClassLoader(testClassLoader);
+        
+        sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "FOO", "BAR");
+
+        settings = new SessionSettings();
+        settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL, TransportType.VM_PIPE.toString());
+        settings.setString(Initiator.SETTING_SOCKET_CONNECT_PROTOCOL, TransportType.VM_PIPE.toString());
+        settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false);
+        setSessionID(settings, sessionID);   
+
+        DefaultCamelContext camelContext = new DefaultCamelContext();
+        component = new QuickfixjComponent();
+        component.setCamelContext(camelContext);
+        assertThat(component.getEngines().size(), is(0));
+
+        Method converterMethod = QuickfixjConverters.class.getMethod("toSessionID", new Class<?>[] {String.class});
+        camelContext.getTypeConverterRegistry().addTypeConverter(SessionID.class, String.class,  new StaticMethodTypeConverter(converterMethod));
+        
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        Thread.currentThread().setContextClassLoader(contextClassLoader);   
+        component.stop();
+    }
+
+    @Test
+    public void createEndpointBeforeComponentStart() throws Exception {
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
+
+        writeSettings();
+
+        Endpoint e1 = component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
+        assertThat(component.getEngines().size(), is(1));
+        assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
+        assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
+        assertThat(((QuickfixjEndpoint)e1).getSessionID(), is(nullValue()));
+        
+        // Should used cached QFJ engine
+        Endpoint e2 = component.createEndpoint(getEndpointUri(settingsFile.getName(), sessionID));
+        
+        assertThat(component.getEngines().size(), is(1));
+        assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
+        assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
+        assertThat(((QuickfixjEndpoint)e2).getSessionID(), is(sessionID));
+        
+        component.start();
+        assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
+        
+        // Move these too an endpoint testcase if one exists
+        assertThat(e2.isSingleton(), is(true));
+        assertThat(((MultipleConsumersSupport)e2).isMultipleConsumersSupported(), is(true));
+    }
+    
+    @Test
+    public void createEndpointAfterComponentStart() throws Exception {
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
+
+        writeSettings();
+
+        component.start();
+
+        Endpoint e1 = component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
+        assertThat(component.getEngines().size(), is(1));
+        assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
+        assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
+        assertThat(((QuickfixjEndpoint)e1).getSessionID(), is(nullValue()));
+        
+        // Should used cached QFJ engine
+        Endpoint e2 = component.createEndpoint(getEndpointUri(settingsFile.getName(), sessionID));
+        
+        assertThat(component.getEngines().size(), is(1));
+        assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue()));
+        assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true));
+        assertThat(((QuickfixjEndpoint)e2).getSessionID(), is(sessionID));
+    }
+
+    @Test
+    public void componentStop() throws Exception {
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
+
+        writeSettings();
+
+        Endpoint endpoint = component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
+        
+        final CountDownLatch latch = new CountDownLatch(1);
+        
+        Consumer consumer = endpoint.createConsumer(new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                QuickfixjEventCategory eventCategory = 
+                    (QuickfixjEventCategory) exchange.getIn().getHeader(QuickfixjEndpoint.EVENT_CATEGORY_KEY);
+                if (eventCategory == QuickfixjEventCategory.SessionCreated) {
+                    latch.countDown();
+                }
+            }
+        });
+        
+        // Endpoint automatically starts the consumer
+        assertThat(((ServiceSupport)consumer).isStarted(), is(true));
+        
+        component.start();
+        
+        assertTrue("Session not created", latch.await(5000, TimeUnit.MILLISECONDS));
+        
+        component.stop();
+        
+        assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false));
+    }
+
+    @Test
+    public void messagePublication() throws Exception {
+        // Create settings file with both acceptor and initiator
+        
+        SessionSettings settings = new SessionSettings();
+        settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL, TransportType.VM_PIPE.toString());
+        settings.setString(Initiator.SETTING_SOCKET_CONNECT_PROTOCOL, TransportType.VM_PIPE.toString());
+        settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false);
+        
+        SessionID acceptorSessionID =  new SessionID(FixVersions.BEGINSTRING_FIX44, "ACCEPTOR", "INITIATOR");
+        settings.setString(acceptorSessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+        settings.setLong(acceptorSessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+        setSessionID(settings, acceptorSessionID);
+        
+        SessionID initiatorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "INITIATOR", "ACCEPTOR");
+        settings.setString(initiatorSessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        settings.setLong(initiatorSessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
+        settings.setLong(initiatorSessionID, Initiator.SETTING_RECONNECT_INTERVAL, 1);
+        setSessionID(settings, initiatorSessionID);
+
+        writeSettings(settings);
+        
+        Endpoint endpoint = component.createEndpoint(getEndpointUri(settingsFile.getName(), null));
+        
+        // Start the component and wait for the FIX sessions to be logged on
+
+        final CountDownLatch logonLatch = new CountDownLatch(2);
+        final CountDownLatch messageLatch = new CountDownLatch(2);
+                
+        endpoint.createConsumer(new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                QuickfixjEventCategory eventCategory = 
+                    (QuickfixjEventCategory) exchange.getIn().getHeader(QuickfixjEndpoint.EVENT_CATEGORY_KEY);
+                if (eventCategory == QuickfixjEventCategory.SessionLogon) {
+                    logonLatch.countDown();
+                } else if (eventCategory == QuickfixjEventCategory.AppMessageReceived) {
+                    messageLatch.countDown();
+                }
+            }
+        });
+        
+        component.start();
+        
+        assertTrue("Session not created", logonLatch.await(5000, TimeUnit.MILLISECONDS));
+       
+        Endpoint producerEndpoint = component.createEndpoint(getEndpointUri(settingsFile.getName(), acceptorSessionID));
+        Producer producer = producerEndpoint.createProducer();
+        
+        // FIX message to send
+        Email email = new Email(new EmailThreadID("ID"), new EmailType(EmailType.NEW), new Subject("Test"));
+        Exchange exchange = producer.createExchange(ExchangePattern.InOnly);
+        exchange.getIn().setBody(email);
+        
+        producer.process(exchange);            
+
+        // Produce with no session ID specified, session ID must be in message
+        Producer producer2 = endpoint.createProducer();
+         
+        email.getHeader().setString(SenderCompID.FIELD, acceptorSessionID.getSenderCompID());
+        email.getHeader().setString(TargetCompID.FIELD, acceptorSessionID.getTargetCompID());
+
+        producer2.process(exchange);
+       
+        assertTrue("Messages not received", messageLatch.await(5000, TimeUnit.MILLISECONDS));
+    }
+
+    private void writeSettings() throws IOException {
+        writeSettings(settings);
+    }
+
+    private void writeSettings(SessionSettings settings) throws IOException {
+        FileOutputStream settingsOut = new FileOutputStream(settingsFile);
+        try {
+            settings.toStream(settingsOut);
+        } finally {
+            settingsOut.close();
+        }
+    }
+}

Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import quickfix.FixVersions;
+import quickfix.Message;
+import quickfix.SessionID;
+
+public class QuickfixjConsumerTest {
+    private Exchange mockExchange;
+    private Processor mockProcessor;
+    private Endpoint mockEndpoint;
+
+    @Before
+    public void setUp() {
+        mockExchange = Mockito.mock(Exchange.class);
+        org.apache.camel.Message mockCamelMessage = Mockito.mock(org.apache.camel.Message.class);
+        Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
+        
+        mockProcessor = Mockito.mock(Processor.class);
+        
+        mockEndpoint = Mockito.mock(Endpoint.class);
+        Mockito.when(mockEndpoint.createExchange(ExchangePattern.InOnly)).thenReturn(mockExchange);        
+    }
+    
+    @Test
+    public void processExchangeOnlyWhenStarted() throws Exception {
+        QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, mockProcessor);
+        
+        Assert.assertThat("Consumer should not be automatically started", 
+            ((ServiceSupport)consumer).isStarted(), CoreMatchers.is(false));
+        
+        // Simulate a message from the FIX engine
+        SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");       
+        consumer.onEvent(QuickfixjEventCategory.AppMessageReceived, sessionID, new Message());
+        
+        // No expected interaction with processor since component is not started
+        Mockito.verifyZeroInteractions(mockProcessor);
+        
+        consumer.start();
+        Assert.assertThat(((ServiceSupport)consumer).isStarted(), CoreMatchers.is(true));
+        
+        // Simulate a message from the FIX engine
+        consumer.onEvent(QuickfixjEventCategory.AppMessageReceived, sessionID, new Message());
+        
+        // Second message should be processed
+        Mockito.verify(mockProcessor).process(Matchers.isA(Exchange.class));
+    }
+    
+    @Test
+    public void setExceptionOnExchange() throws Exception {
+              
+        QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, mockProcessor);
+        consumer.start();
+        
+        Throwable exception = new Exception("Throwable for test");
+        Mockito.doThrow(exception).when(mockProcessor).process(mockExchange);
+        
+        // Simulate a message from the FIX engine
+        SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");       
+        consumer.onEvent(QuickfixjEventCategory.AppMessageReceived, sessionID, new Message());
+        
+        Mockito.verify(mockExchange).setException(exception);
+    }
+}