You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:29 UTC

[05/27] Initial drop of donated AMQP Client Code.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java
new file mode 100644
index 0000000..b2daca2
--- /dev/null
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.discovery.multicast;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.jms.provider.discovery.DiscoveryAgent;
+import org.apache.qpid.jms.provider.discovery.DiscoveryEvent;
+import org.apache.qpid.jms.provider.discovery.DiscoveryEvent.EventType;
+import org.apache.qpid.jms.provider.discovery.DiscoveryListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Discovery agent that listens on a multicast address for new Broker advisories.
+ */
+public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
+
+    public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
+    public static final String DEFAULT_HOST_STR = "default";
+    public static final String DEFAULT_HOST_IP = System.getProperty("qpidjms.partition.discovery", "239.255.2.3");
+    public static final int DEFAULT_PORT = 6155;
+
+    private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class);
+    private static final int BUFF_SIZE = 8192;
+    private static final int DEFAULT_IDLE_TIME = 500;
+    private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10;
+
+    private DiscoveryListener listener;
+    private URI discoveryURI;
+    private int timeToLive = 1;
+    private boolean loopBackMode;
+    private final Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
+    private String group = "default";
+    private InetAddress inetAddress;
+    private SocketAddress sockAddress;
+    private MulticastSocket mcast;
+    private Thread runner;
+    private long keepAliveInterval = DEFAULT_IDLE_TIME;
+    private String mcInterface;
+    private String mcNetworkInterface;
+    private String mcJoinNetworkInterface;
+    private String service;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private PacketParser parser;
+
+    public MulticastDiscoveryAgent(URI discoveryURI) {
+        this.discoveryURI = discoveryURI;
+    }
+
+    @Override
+    public void setDiscoveryListener(DiscoveryListener listener) {
+        this.listener = listener;
+    }
+
+    public DiscoveryListener getDiscoveryListener() {
+        return this.listener;
+    }
+
+    @Override
+    public void start() throws IOException, IllegalStateException {
+        if (listener == null) {
+            throw new IllegalStateException("No DiscoveryListener configured.");
+        }
+
+        if (started.compareAndSet(false, true)) {
+
+            if (group == null || group.length() == 0) {
+                throw new IOException("You must specify a group to discover");
+            }
+
+            if (discoveryURI == null) {
+                try {
+                    discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING);
+                } catch (URISyntaxException e) {
+                    // Default is always valid.
+                }
+            }
+
+            LOG.trace("mcast - discoveryURI = {}", discoveryURI);
+
+            String myHost = discoveryURI.getHost();
+            int myPort = discoveryURI.getPort();
+
+            if (DEFAULT_HOST_STR.equals(myHost)) {
+                myHost = DEFAULT_HOST_IP;
+            }
+
+            if (myPort < 0) {
+                myPort = DEFAULT_PORT;
+            }
+
+            LOG.trace("mcast - myHost = {}", myHost);
+            LOG.trace("mcast - myPort = {}", myPort);
+            LOG.trace("mcast - group = {}", group);
+            LOG.trace("mcast - interface = {}", mcInterface);
+            LOG.trace("mcast - network interface = {}", mcNetworkInterface);
+            LOG.trace("mcast - join network interface = {}", mcJoinNetworkInterface);
+
+            this.inetAddress = InetAddress.getByName(myHost);
+            this.sockAddress = new InetSocketAddress(this.inetAddress, myPort);
+            mcast = new MulticastSocket(myPort);
+            mcast.setLoopbackMode(loopBackMode);
+            mcast.setTimeToLive(getTimeToLive());
+            if (mcJoinNetworkInterface != null) {
+                mcast.joinGroup(sockAddress, NetworkInterface.getByName(mcJoinNetworkInterface));
+            } else {
+                mcast.joinGroup(inetAddress);
+            }
+            mcast.setSoTimeout((int) keepAliveInterval);
+            if (mcInterface != null) {
+                mcast.setInterface(InetAddress.getByName(mcInterface));
+            }
+            if (mcNetworkInterface != null) {
+                mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface));
+            }
+            runner = new Thread(this);
+            runner.setName(this.toString() + ":" + runner.getName());
+            runner.setDaemon(true);
+            runner.start();
+        }
+    }
+
+    @Override
+    public void close() {
+        if (started.compareAndSet(true, false)) {
+            if (mcast != null) {
+                mcast.close();
+            }
+            if (runner != null) {
+                runner.interrupt();
+            }
+        }
+    }
+
+    @Override
+    public void suspend() {
+        // We don't suspend multicast as it's mostly a passive listener.
+    }
+
+    @Override
+    public void resume() {
+        // We don't suspend multicast as it's mostly a passive listener.
+    }
+
+    @Override
+    public void run() {
+        byte[] buf = new byte[BUFF_SIZE];
+        DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
+        while (started.get()) {
+            expireOldServices();
+            try {
+                mcast.receive(packet);
+                if (packet.getLength() > 0) {
+                    DiscoveryEvent event = parser.processPacket(packet.getData(), packet.getOffset(), packet.getLength());
+                    if (event != null) {
+                        if (event.getType() == EventType.ALIVE) {
+                            processAlive(event);
+                        } else {
+                            processShutdown(event);
+                        }
+                    }
+                }
+            } catch (SocketTimeoutException se) {
+                // ignore
+            } catch (IOException e) {
+                if (started.get()) {
+                    LOG.error("failed to process packet: {}", e.getMessage());
+                    LOG.trace(" packet processing failed by: {}", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "MulticastDiscoveryAgent: listener:" + getDiscvoeryURI();
+    }
+
+    //---------- Internal Implementation -------------------------------------//
+
+    private void processAlive(DiscoveryEvent event) {
+        RemoteBrokerData data = brokersByService.get(event.getPeerUri());
+        if (data == null) {
+            String peerUri = event.getPeerUri();
+            data = new RemoteBrokerData(event.getPeerUri());
+            brokersByService.put(peerUri, data);
+            fireServiceAddEvent(data);
+        } else {
+            data.updateHeartBeat();
+        }
+    }
+
+    private void processShutdown(DiscoveryEvent event) {
+        RemoteBrokerData data = brokersByService.remove(event.getPeerUri());
+        if (data != null) {
+            fireServiceRemovedEvent(data);
+        }
+    }
+
+    private void expireOldServices() {
+        long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
+        for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
+            RemoteBrokerData data = i.next();
+            if (data.getLastHeartBeat() < expireTime) {
+                processShutdown(data.asShutdownEvent());
+            }
+        }
+    }
+
+    private void fireServiceRemovedEvent(final RemoteBrokerData data) {
+        if (listener != null && started.get()) {
+            listener.onServiceRemove(data);
+        }
+    }
+
+    private void fireServiceAddEvent(final RemoteBrokerData data) {
+        if (listener != null && started.get()) {
+            listener.onServiceAdd(data);
+        }
+    }
+
+    // ---------- Property Accessors ------------------------------------------//
+
+    /**
+     * @return the original URI used to create the Discovery Agent.
+     */
+    public URI getDiscvoeryURI() {
+        return this.discoveryURI;
+    }
+
+    /**
+     * @return Returns the loopBackMode.
+     */
+    public boolean isLoopBackMode() {
+        return loopBackMode;
+    }
+
+    /**
+     * @param loopBackMode
+     *        The loopBackMode to set.
+     */
+    public void setLoopBackMode(boolean loopBackMode) {
+        this.loopBackMode = loopBackMode;
+    }
+
+    /**
+     * @return Returns the timeToLive.
+     */
+    public int getTimeToLive() {
+        return timeToLive;
+    }
+
+    /**
+     * @param timeToLive
+     *        The timeToLive to set.
+     */
+    public void setTimeToLive(int timeToLive) {
+        this.timeToLive = timeToLive;
+    }
+
+    public long getKeepAliveInterval() {
+        return keepAliveInterval;
+    }
+
+    public void setKeepAliveInterval(long keepAliveInterval) {
+        this.keepAliveInterval = keepAliveInterval;
+    }
+
+    public void setInterface(String mcInterface) {
+        this.mcInterface = mcInterface;
+    }
+
+    public void setNetworkInterface(String mcNetworkInterface) {
+        this.mcNetworkInterface = mcNetworkInterface;
+    }
+
+    public void setJoinNetworkInterface(String mcJoinNetwrokInterface) {
+        this.mcJoinNetworkInterface = mcJoinNetwrokInterface;
+    }
+
+    /**
+     * @return the multicast group this agent is assigned to.
+     */
+    public String getGroup() {
+        return this.group;
+    }
+
+    /**
+     * Sets the multicast group this agent is assigned to.  The group can only be set
+     * prior to starting the agent, once started the group change will never take effect.
+     *
+     * @param group
+     *        the multicast group the agent is assigned to.
+     */
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    /**
+     * Returns the name of the service that is providing the discovery data for this agent such
+     * as ActiveMQ.
+     *
+     * @return the name of the service that is advertising remote peer data.
+     */
+    public String getService() {
+        return this.service;
+    }
+
+    /**
+     * Sets the name of the service that is providing the remote peer discovery data.
+     *
+     * @param name
+     *        the name of the service that provides this agent with remote peer data.
+     */
+    public void setService(String name) {
+        this.service = name;
+    }
+
+    /**
+     * @return the currently configured datagram packet parser for this agent.
+     */
+    public PacketParser getParser() {
+        return parser;
+    }
+
+    /**
+     * Sets the datagram packet parser used to read the discovery data broadcast by the service
+     * being monitored for remote peers.
+     *
+     * @param parser
+     *        the datagram packet parser to use.
+     */
+    public void setParser(PacketParser parser) {
+        this.parser = parser;
+    }
+
+    // ---------- Discovered Peer Bookkeeping Class ---------------------------//
+
+    private class RemoteBrokerData extends DiscoveryEvent {
+
+        long lastHeartBeat;
+
+        public RemoteBrokerData(String peerUri) {
+            super(peerUri, EventType.ALIVE);
+            this.lastHeartBeat = System.currentTimeMillis();
+        }
+
+        /**
+         * @return an event representing this remote peers shutdown event.
+         */
+        public DiscoveryEvent asShutdownEvent() {
+            return new DiscoveryEvent(getPeerUri(), EventType.SHUTDOWN);
+        }
+
+        public synchronized void updateHeartBeat() {
+            lastHeartBeat = System.currentTimeMillis();
+        }
+
+        public synchronized long getLastHeartBeat() {
+            return lastHeartBeat;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java
new file mode 100644
index 0000000..6730f33
--- /dev/null
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.discovery.multicast;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.provider.discovery.DiscoveryAgent;
+import org.apache.qpid.jms.provider.discovery.DiscoveryAgentFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+import org.apache.qpid.jms.util.URISupport;
+
+/**
+ * Creates and configures a new instance of the mutlicast agent.
+ */
+public class MulticastDiscoveryAgentFactory extends DiscoveryAgentFactory {
+
+    private static final String DEFAULT_SERVICE = "activemq";
+
+    @Override
+    public DiscoveryAgent createDiscoveryAgent(URI discoveryURI) throws Exception {
+        MulticastDiscoveryAgent agent = new MulticastDiscoveryAgent(discoveryURI);
+        Map<String, String> options = URISupport.parseParameters(discoveryURI);
+        PropertyUtil.setProperties(agent, options);
+
+        String service = agent.getService();
+        if (service == null || service.isEmpty()) {
+            service = DEFAULT_SERVICE;
+        }
+
+        PacketParser packetParser = PacketParserFactory.createAgent(service);
+        packetParser.setGroup(agent.getGroup());
+
+        agent.setParser(packetParser);
+
+        return agent;
+    }
+
+    @Override
+    public String getName() {
+        return "multicast";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java
new file mode 100644
index 0000000..294234e
--- /dev/null
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.discovery.multicast;
+
+import org.apache.qpid.jms.provider.discovery.DiscoveryEvent;
+
+/**
+ * Interface for a DatagramPacket parser object which is used by the
+ * MulticastDiscoveryAget to parse incoming packets to determine the
+ * discovered peer information.
+ */
+public interface PacketParser {
+
+    /**
+     * @return the multicast group assignment for this parser.
+     */
+    String getGroup();
+
+    /**
+     * Sets the multicast group that the parent agent is assigned to.  This can
+     * be used in some cases to parse discovery messages.
+     *
+     * @param group
+     *        the multicast group that this parser's parent agent resides in./
+     */
+    void setGroup(String group);
+
+    /**
+     * Process in incoming event packet and create a DiscoveryEvent from the data.
+     *
+     * @param data
+     *        the new data packet to process.
+     * @param offset
+     *        the offset into the data buffer to start at.
+     * @param length
+     *        the length of the data packet contained in the buffer.
+     *
+     * @return a new DiscoveryEvent created from pocessing the incoming data.
+     */
+    DiscoveryEvent processPacket(byte[] data, int offset, int length);
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParserFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParserFactory.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParserFactory.java
new file mode 100644
index 0000000..c69b78d
--- /dev/null
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParserFactory.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.discovery.multicast;
+
+import java.io.IOException;
+
+import org.apache.qpid.jms.util.FactoryFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory used to find and create instances of DiscoveryAgent using the name
+ * of the desired agent to locate it's factory class definition file.
+ */
+public abstract class PacketParserFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PacketParserFactory.class);
+
+    private static final FactoryFinder<PacketParserFactory> AGENT_FACTORY_FINDER =
+        new FactoryFinder<PacketParserFactory>(
+            PacketParserFactory.class,
+            "META-INF/services/org/apache/qpid/jms/provider/agents/multicast-parsers/");
+
+    /**
+     * Creates an instance of the given PacketParser
+     *
+     * @param key
+     *        the name of the required packet parser for the agent.
+     *
+     * @return a new PacketParser instance.
+     *
+     * @throws Exception if an error occurs while creating the PacketParser instance.
+     */
+    public abstract PacketParser createPacketParser(String key) throws Exception;
+
+    /**
+     * @return the name of this packet parser, e.g ActiveMQ.
+     */
+    public abstract String getName();
+
+    /**
+     * Static create method that performs the PacketParser search and handles the
+     * configuration and setup.
+     *
+     * @param key
+     *        the name of the desired PacketParser type.
+     *
+     * @return a new PacketParser instance that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the PacketParser instance.
+     */
+    public static PacketParser createAgent(String key) throws Exception {
+        PacketParser result = null;
+
+        try {
+            PacketParserFactory factory = findAgentFactory(key);
+            result = factory.createPacketParser(key);
+        } catch (Exception ex) {
+            LOG.error("Failed to create PacketParserFactory instance for: {}", key);
+            LOG.trace("Error: ", ex);
+            throw ex;
+        }
+
+        return result;
+    }
+
+    /**
+     * Searches for a PacketParserFactory by using the given key.
+     *
+     * The search first checks the local cache of packet parser factories before moving on
+     * to search in the classpath.
+     *
+     * @param key
+     *        The name of the PacketParserFactory that should be located.
+     *
+     * @return a PacketParserFactory instance matching the given key.
+     *
+     * @throws IOException if an error occurs while locating the factory.
+     */
+    protected static PacketParserFactory findAgentFactory(String key) throws IOException {
+        if (key == null) {
+            throw new IOException("No PacketParserFactory name specified: [" + key + "]");
+        }
+
+        PacketParserFactory factory = null;
+        if (factory == null) {
+            try {
+                factory = AGENT_FACTORY_FINDER.newInstance(key);
+            } catch (Throwable e) {
+                throw new IOException("Discovery Agent scheme NOT recognized: [" + key + "]", e);
+            }
+        }
+
+        return factory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java
new file mode 100644
index 0000000..1cb2935
--- /dev/null
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.discovery.multicast.parsers;
+
+import org.apache.qpid.jms.provider.discovery.DiscoveryEvent;
+import org.apache.qpid.jms.provider.discovery.DiscoveryEvent.EventType;
+import org.apache.qpid.jms.provider.discovery.multicast.PacketParser;
+
+/**
+ * Parser instance for ActiveMQ multicast discovery processing.
+ */
+public class ActiveMQPacketParser implements PacketParser {
+
+    private static final String TYPE_SUFFIX = "ActiveMQ-4.";
+    private static final String ALIVE = "alive.";
+    private static final String DEAD = "dead.";
+    private static final String DELIMITER = "%";
+
+    private String group;
+
+    @Override
+    public String getGroup() {
+        return this.group;
+    }
+
+    @Override
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    @Override
+    public DiscoveryEvent processPacket(byte[] packet, int offset, int length) {
+        String str = new String(packet, offset, length);
+        DiscoveryEvent event = null;
+        if (str.startsWith(getType())) {
+            String payload = str.substring(getType().length());
+            if (payload.startsWith(ALIVE)) {
+                String brokerName = getBrokerName(payload.substring(ALIVE.length()));
+                String brokerUri = payload.substring(ALIVE.length() + brokerName.length() + 2);
+                event = new DiscoveryEvent(brokerUri, EventType.ALIVE);
+            } else {
+                String brokerName = getBrokerName(payload.substring(DEAD.length()));
+                String brokerUri = payload.substring(DEAD.length() + brokerName.length() + 2);
+                event = new DiscoveryEvent(brokerUri, EventType.SHUTDOWN);
+            }
+        }
+        return event;
+    }
+
+    private String getBrokerName(String str) {
+        String result = null;
+        int start = str.indexOf(DELIMITER);
+        if (start >= 0) {
+            int end = str.indexOf(DELIMITER, start + 1);
+            result = str.substring(start + 1, end);
+        }
+        return result;
+    }
+
+    private String getType() {
+        return group + "." + TYPE_SUFFIX;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParserFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParserFactory.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParserFactory.java
new file mode 100644
index 0000000..67f8a48
--- /dev/null
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParserFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.discovery.multicast.parsers;
+
+import org.apache.qpid.jms.provider.discovery.multicast.PacketParser;
+import org.apache.qpid.jms.provider.discovery.multicast.PacketParserFactory;
+
+/**
+ * Factory class for the ActiveMQ Packet Parser used to process data set over
+ * multicast when discovering ActiveMQ Brokers.
+ */
+public class ActiveMQPacketParserFactory extends PacketParserFactory {
+
+    @Override
+    public PacketParser createPacketParser(String key) throws Exception {
+        return new ActiveMQPacketParser();
+    }
+
+    @Override
+    public String getName() {
+        return "ActiveMQ";
+    }
+
+    @Override
+    public String toString() {
+        return getName() + ": Discovery Parser.";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/multicast
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/multicast b/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/multicast
new file mode 100644
index 0000000..1fdf0b2
--- /dev/null
+++ b/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/multicast
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.provider.discovery.multicast.MulticastDiscoveryAgentFactory

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/multicast-parsers/activemq
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/multicast-parsers/activemq b/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/multicast-parsers/activemq
new file mode 100644
index 0000000..8278f11
--- /dev/null
+++ b/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/multicast-parsers/activemq
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.provider.discovery.multicast.parsers.ActiveMQPacketParserFactory

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/discovery
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/discovery b/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/discovery
new file mode 100644
index 0000000..a5b8170
--- /dev/null
+++ b/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/discovery
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.provider.discovery.DiscoveryProviderFactory

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-examples/.gitignore
----------------------------------------------------------------------
diff --git a/qpid-jms-examples/.gitignore b/qpid-jms-examples/.gitignore
new file mode 100644
index 0000000..ea8c4bf
--- /dev/null
+++ b/qpid-jms-examples/.gitignore
@@ -0,0 +1 @@
+/target

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-examples/pom.xml
----------------------------------------------------------------------
diff --git a/qpid-jms-examples/pom.xml b/qpid-jms-examples/pom.xml
new file mode 100644
index 0000000..bd2b656
--- /dev/null
+++ b/qpid-jms-examples/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.qpid</groupId>
+    <artifactId>qpid-jms-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>qpid-jms-examples</artifactId>
+  <packaging>jar</packaging>
+  <name>QpidJMS Examples</name>
+  <description>Examples for QpidJMS</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-jms-client</artifactId>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-examples/src/main/java/org/apache/qpid/jms/example/Drain.java
----------------------------------------------------------------------
diff --git a/qpid-jms-examples/src/main/java/org/apache/qpid/jms/example/Drain.java b/qpid-jms-examples/src/main/java/org/apache/qpid/jms/example/Drain.java
new file mode 100644
index 0000000..9e8ffa5
--- /dev/null
+++ b/qpid-jms-examples/src/main/java/org/apache/qpid/jms/example/Drain.java
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.example;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+public class Drain
+{
+    private static final String DEFAULT_USER = "guest";
+    private static final String DEFAULT_PASSWORD = "guest";
+    private static final int DEFAULT_PORT = 5672;
+    private static final String DEFAULT_HOST = "localhost";
+    private static final int DEFAULT_COUNT = 10000;
+
+    private String _hostname;
+    private int _port;
+    private int _count;
+    private String _username;
+    private String _password;
+    private String _queuePrefix;
+
+    public Drain(int count, String hostname, int port, String queuePrefix)
+    {
+        _count = count;
+        _hostname = hostname;
+        _port = port;
+        _username = DEFAULT_USER;
+        _password = DEFAULT_PASSWORD;
+        _queuePrefix = queuePrefix;
+    }
+
+    public void runExample()
+    {
+        try
+        {
+            //TODO: use JNDI lookup rather than direct instantiation
+            JmsConnectionFactory factory = new JmsConnectionFactory("amqp://" + _hostname + ":" + _port);
+            if(_queuePrefix != null)
+            {
+                //TODO: use URL options?
+                factory.setQueuePrefix(_queuePrefix);
+            }
+
+            Connection connection = factory.createConnection(_username,_password);
+            connection.start();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            Destination queue = session.createQueue("myQueue");
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+
+            long start = System.currentTimeMillis();
+
+            int actualCount = 0;
+            boolean deductTimeout = false;
+            int timeout = 1000;
+            for(int i = 1; i <= _count; i++, actualCount++)
+            {
+                TextMessage message = (TextMessage)messageConsumer.receive(timeout);
+                if(message == null)
+                {
+                    System.out.println("Message not received, stopping");
+                    deductTimeout = true;
+                    break;
+                }
+                if(i % 100 == 0)
+                {
+                    System.out.println("Got message " + i + ":" + message.getText());
+                }
+            }
+
+            long finish = System.currentTimeMillis();
+            long taken = finish - start;
+            if(deductTimeout)
+            {
+                taken -= timeout;
+            }
+            System.out.println("Received " + actualCount +" messages in " + taken + "ms");
+
+            connection.close();
+        }
+        catch (Exception exp)
+        {
+            exp.printStackTrace();
+            System.exit(1);
+        }
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        List<String> switches = new ArrayList<String>();
+        List<String> args = new ArrayList<String>();
+        for (String s : argv)
+        {
+            if (s.startsWith("-"))
+            {
+                switches.add(s);
+            }
+            else
+            {
+                args.add(s);
+            }
+        }
+
+        int count = args.isEmpty() ? DEFAULT_COUNT : Integer.parseInt(args.remove(0));
+        String hostname = args.isEmpty() ? DEFAULT_HOST : args.remove(0);
+        int port = args.isEmpty() ? DEFAULT_PORT : Integer.parseInt(args.remove(0));
+        String queuePrefix = args.isEmpty() ? null : args.remove(0);
+
+        new Drain(count, hostname, port, queuePrefix).runExample();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-examples/src/main/java/org/apache/qpid/jms/example/Spout.java
----------------------------------------------------------------------
diff --git a/qpid-jms-examples/src/main/java/org/apache/qpid/jms/example/Spout.java b/qpid-jms-examples/src/main/java/org/apache/qpid/jms/example/Spout.java
new file mode 100644
index 0000000..f6b15e7
--- /dev/null
+++ b/qpid-jms-examples/src/main/java/org/apache/qpid/jms/example/Spout.java
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.example;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+public class Spout
+{
+    private static final String DEFAULT_USER = "guest";
+    private static final String DEFAULT_PASSWORD = "guest";
+    private static final int DEFAULT_PORT = 5672;
+    private static final String DEFAULT_HOST = "localhost";
+    private static final int DEFAULT_COUNT = 10000;
+
+    private String _hostname;
+    private int _port;
+    private int _count;
+    private String _username;
+    private String _password;
+    private String _queuePrefix;
+    private boolean _persistent;
+
+    public Spout(int count, String hostname, int port, String queuePrefix, boolean persistent)
+    {
+        _count = count;
+        _hostname = hostname;
+        _port = port;
+        _persistent = persistent;
+        _username = DEFAULT_USER;
+        _password = DEFAULT_PASSWORD;
+        _queuePrefix = queuePrefix;
+    }
+
+    public void runExample()
+    {
+        try
+        {
+            //TODO: use JNDI lookup rather than direct instantiation
+            JmsConnectionFactory factory = new JmsConnectionFactory("amqp://" + _hostname + ":" + _port);
+            if(_queuePrefix != null)
+            {
+                //TODO: use URL options?
+                factory.setQueuePrefix(_queuePrefix);
+            }
+
+            Connection connection = factory.createConnection(_username,_password);
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            Destination queue = session.createQueue("myQueue");
+            MessageProducer messageProducer = session.createProducer(queue);
+
+            int dekiveryMode = _persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+            long start = System.currentTimeMillis();
+            for(int i = 1; i <= _count; i++)
+            {
+                TextMessage message = session.createTextMessage("Hello world!");
+                messageProducer.send(message, dekiveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+                if(i % 100 == 0)
+                {
+                    System.out.println("Sent message " + i + ":" + message.getText());
+                }
+            }
+
+            long finish = System.currentTimeMillis();
+            long taken = finish - start;
+            System.out.println("Sent " + _count +" messages in " + taken + "ms");
+
+            connection.close();
+        }
+        catch (Exception exp)
+        {
+            exp.printStackTrace();
+            System.exit(1);
+        }
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        List<String> switches = new ArrayList<String>();
+        List<String> args = new ArrayList<String>();
+        for (String s : argv)
+        {
+            if (s.startsWith("-"))
+            {
+                switches.add(s);
+            }
+            else
+            {
+                args.add(s);
+            }
+        }
+
+        int count = args.isEmpty() ? DEFAULT_COUNT : Integer.parseInt(args.remove(0));
+        String hostname = args.isEmpty() ? DEFAULT_HOST : args.remove(0);
+        int port = args.isEmpty() ? DEFAULT_PORT : Integer.parseInt(args.remove(0));
+        String queuePrefix = args.isEmpty() ? null : args.remove(0);
+        boolean persistent = switches.contains("-p");
+
+        new Spout(count, hostname, port, queuePrefix, persistent).runExample();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/.gitignore
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/.gitignore b/qpid-jms-interop-tests/.gitignore
new file mode 100644
index 0000000..ea8c4bf
--- /dev/null
+++ b/qpid-jms-interop-tests/.gitignore
@@ -0,0 +1 @@
+/target

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/README.md
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/README.md b/qpid-jms-interop-tests/README.md
new file mode 100644
index 0000000..ba1bae9
--- /dev/null
+++ b/qpid-jms-interop-tests/README.md
@@ -0,0 +1,3 @@
+AMQP JMS Client Broker Interop tests
+----------------------------------------------
+This module contains maven submodules that should exercise the JMS client against different AMQP message brokers in order to ensure that it can work with as many AMQP brokers as possible.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/pom.xml
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/pom.xml b/qpid-jms-interop-tests/pom.xml
new file mode 100644
index 0000000..30a9101
--- /dev/null
+++ b/qpid-jms-interop-tests/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.qpid</groupId>
+    <artifactId>qpid-jms-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>qpid-jms-interop-tests</artifactId>
+  <name>QpidJMS Broker Interop Tests</name>
+  <description>A set of modules meant for testing interoperability between the client and various AMQP Brokers.</description>
+  <packaging>pom</packaging>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <modules>
+    <module>qpid-jms-activemq-tests</module>
+  </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/.gitignore
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/.gitignore b/qpid-jms-interop-tests/qpid-jms-activemq-tests/.gitignore
new file mode 100644
index 0000000..ea8c4bf
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/.gitignore
@@ -0,0 +1 @@
+/target

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/pom.xml
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/pom.xml b/qpid-jms-interop-tests/qpid-jms-activemq-tests/pom.xml
new file mode 100644
index 0000000..0087cd0
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.qpid</groupId>
+    <artifactId>qpid-jms-interop-tests</artifactId>
+    <version>1.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>qpid-jms-activemq-tests</artifactId>
+  <name>QpidJMS ActiveMQ Interop Tests</name>
+  <description>Tests the JMS client against the ActiveMQ Broker.</description>
+  <packaging>jar</packaging>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <dependencies>
+    <!-- =================================== -->
+    <!-- Required Dependencies                -->
+    <!-- =================================== -->
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-jms-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-jms-discovery</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-selector</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <!-- =================================== -->
+    <!-- Testing Dependencies                -->
+    <!-- =================================== -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!--  Joram JMS conformance tests -->
+    <dependency>
+      <groupId>org.fusesource.joram-jms-tests</groupId>
+      <artifactId>joram-jms-tests</artifactId>
+      <version>${fuse-joram-tests-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <!-- using it for Jetty's JNDI context to work /w Joram tests. -->
+      <groupId>org.eclipse.jetty.aggregate</groupId>
+      <artifactId>jetty-all-server</artifactId>
+      <version>${jetty-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <version>${activemq-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-kahadb-store</artifactId>
+      <version>${activemq-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-amqp</artifactId>
+      <version>${activemq-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-jaas</artifactId>
+      <version>${activemq-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-spring</artifactId>
+      <version>${activemq-version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionCloseVariationsTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionCloseVariationsTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionCloseVariationsTest.java
new file mode 100644
index 0000000..32bd170
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionCloseVariationsTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * A test case for Connection close called under different circumstances.
+ */
+public class JmsConnectionCloseVariationsTest extends AmqpTestSupport {
+
+    @Test(timeout=60000)
+    public void testCloseAfterBrokerStopped() throws Exception {
+        doTestConnectionClosedAfterBrokerStopped();
+    }
+
+    @Test(timeout=90000)
+    public void testCloseAfterBrokerStoppedRepeated() throws Exception {
+        for (int i = 0; i < 50; ++i) {
+            doTestConnectionClosedAfterBrokerStopped();
+            restartPrimaryBroker();
+        }
+    }
+
+    private void doTestConnectionClosedAfterBrokerStopped() throws Exception {
+        Connection connection = createAmqpConnection();
+        connection.start();
+        stopPrimaryBroker();
+        connection.close();
+    }
+
+    @Test(timeout=60000)
+    public void testCloseBeforeBrokerStopped() throws Exception {
+        doTestConnectionClosedBeforeBrokerStopped();
+    }
+
+    @Test(timeout=90000)
+    public void testCloseBeforeBrokerStoppedRepeated() throws Exception {
+        for (int i = 0; i < 50; ++i) {
+            doTestConnectionClosedBeforeBrokerStopped();
+            restartPrimaryBroker();
+        }
+    }
+
+    private void doTestConnectionClosedBeforeBrokerStopped() throws Exception {
+        Connection connection = createAmqpConnection();
+        connection.start();
+        connection.close();
+        stopPrimaryBroker();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionClosedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionClosedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionClosedTest.java
new file mode 100644
index 0000000..773deeb
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionClosedTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test Connection methods contracts when state is closed.
+ */
+public class JmsConnectionClosedTest extends AmqpTestSupport {
+
+    protected Destination destination;
+
+    protected Connection createConnection() throws Exception {
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = session.createTopic("test");
+        connection.close();
+        return connection;
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        createConnection();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetClientIdFails() throws JMSException {
+        connection.getClientID();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSetClientIdFails() throws JMSException {
+        connection.setClientID("test");
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetMetaData() throws JMSException {
+        connection.getMetaData();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testGetExceptionListener() throws JMSException {
+        connection.getExceptionListener();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testSetExceptionListener() throws JMSException {
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+            }
+        });
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testStartFails() throws JMSException {
+        connection.start();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testStopFails() throws JMSException {
+        connection.stop();
+    }
+
+    @Test(timeout=30000)
+    public void testClose() throws JMSException {
+        connection.close();
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateConnectionConsumerFails() throws JMSException {
+        connection.createConnectionConsumer(destination, "", null, 1);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateDurableConnectionConsumerFails() throws JMSException {
+        connection.createDurableConnectionConsumer((Topic) destination, "id", "", null, 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java
new file mode 100644
index 0000000..4bf4f73
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+public class JmsConnectionConcurrentCloseCallsTest extends AmqpTestSupport {
+
+    private JmsConnection connection;
+    private ExecutorService executor;
+    private final int size = 200;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+
+        executor = Executors.newFixedThreadPool(20);
+        connection = (JmsConnection) createAmqpConnection();
+        connection.start();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (connection.isStarted()) {
+            connection.stop();
+        }
+        if (executor != null) {
+            executor.shutdownNow();
+        }
+
+        super.tearDown();
+    }
+
+    @Test(timeout=200000)
+    public void testCloseMultipleTimes() throws Exception {
+        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        assertTrue(connection.isStarted());
+        assertFalse(connection.isClosed());
+
+        final CountDownLatch latch = new CountDownLatch(size);
+
+        for (int i = 0; i < size; i++) {
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        connection.close();
+                        assertFalse(connection.isStarted());
+                        assertTrue(connection.isClosed());
+                        latch.countDown();
+                    } catch (Throwable e) {
+                        LOG.warn("Caught an exception: {}", e);
+                    }
+                }
+            });
+        }
+
+        boolean zero = latch.await(200, TimeUnit.SECONDS);
+        assertTrue("Should complete all", zero);
+
+        // should not fail calling again
+        connection.close();
+
+        assertFalse(connection.isStarted());
+        assertTrue(connection.isClosed());
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
new file mode 100644
index 0000000..b1dc4da
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+public class JmsConnectionFactoryTest extends AmqpTestSupport {
+
+    private final String username = "USER";
+    private final String password = "PASSWORD";
+
+    protected String getGoodProviderAddress() {
+        return getBrokerAmqpConnectionURI().toString();
+    }
+
+    protected URI getGoodProviderAddressURI() throws URISyntaxException {
+        return new URI(getGoodProviderAddress());
+    }
+
+    protected String getBadProviderAddress() {
+        return "bad://127.0.0.1:" + 5763;
+    }
+
+    protected URI getBadProviderAddressURI() throws URISyntaxException {
+        return new URI(getBadProviderAddress());
+    }
+
+    @Test(timeout=60000)
+    public void testConnectionFactoryCreate() {
+        JmsConnectionFactory factory = new JmsConnectionFactory();
+        assertNull(factory.getUsername());
+        assertNull(factory.getPassword());
+    }
+
+    @Test(timeout=60000)
+    public void testConnectionFactoryCreateUsernameAndPassword() {
+        JmsConnectionFactory factory = new JmsConnectionFactory(username, password);
+        assertNotNull(factory.getUsername());
+        assertNotNull(factory.getPassword());
+        assertEquals(username, factory.getUsername());
+        assertEquals(password, factory.getPassword());
+    }
+
+    @Test(expected = JMSException.class)
+    public void testCreateConnectionBadProviderURI() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBadProviderAddressURI());
+        factory.createConnection();
+    }
+
+    @Test(expected = JMSException.class)
+    public void testCreateConnectionBadProviderString() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBadProviderAddress());
+        factory.createConnection();
+    }
+
+    @Test(timeout=60000)
+    public void testCreateConnectionGoodProviderURI() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getGoodProviderAddressURI());
+        Connection connection = factory.createConnection();
+        assertNotNull(connection);
+        connection.close();
+    }
+
+    @Test(timeout=60000)
+    public void testCreateConnectionGoodProviderString() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getGoodProviderAddress());
+        Connection connection = factory.createConnection();
+        assertNotNull(connection);
+        connection.close();
+    }
+
+    @Test(timeout=60000)
+    public void testUriOptionsApplied() throws Exception {
+        String uri = getGoodProviderAddress() + "?jms.omitHost=true&jms.forceAsyncSend=true";
+        JmsConnectionFactory factory = new JmsConnectionFactory(uri);
+        assertTrue(factory.isOmitHost());
+        assertTrue(factory.isForceAsyncSend());
+        JmsConnection connection = (JmsConnection) factory.createConnection();
+        assertNotNull(connection);
+        assertTrue(connection.isOmitHost());
+        assertTrue(connection.isForceAsyncSend());
+        connection.close();
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void testBadUriOptionCausesFail() throws Exception {
+        String uri = getGoodProviderAddress() + "?jms.omitHost=true&jms.badOption=true";
+        new JmsConnectionFactory(uri);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionFailedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionFailedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionFailedTest.java
new file mode 100644
index 0000000..b14792e
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionFailedTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.support.Wait;
+
+/**
+ * Test Connection methods contracts when connection has failed.
+ */
+public class JmsConnectionFailedTest extends JmsConnectionClosedTest {
+
+    @Override
+    protected Connection createConnection() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        connection = createAmqpConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+
+            @Override
+            public void onException(JMSException exception) {
+                latch.countDown();
+            }
+        });
+        connection.start();
+        stopPrimaryBroker();
+        assertTrue(latch.await(20, TimeUnit.SECONDS));
+        final JmsConnection jmsConnection = (JmsConnection) connection;
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !jmsConnection.isConnected();
+            }
+        }));
+        return connection;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
new file mode 100644
index 0000000..d328dab
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Test;
+
+/**
+ * Test for basic JmsConnection functionality and error handling.
+ */
+public class JmsConnectionTest extends AmqpTestSupport {
+
+    @Test(timeout=30000)
+    public void testCreateConnection() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        JmsConnection connection = (JmsConnection) factory.createConnection();
+        assertNotNull(connection);
+        connection.close();
+    }
+
+    @Test(timeout=30000)
+    public void testCreateConnectionAndStart() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        JmsConnection connection = (JmsConnection) factory.createConnection();
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(expected = JMSException.class)
+    public void testCreateWithDuplicateClientIdFails() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        JmsConnection connection1 = (JmsConnection) factory.createConnection();
+        connection1.setClientID("Test");
+        assertNotNull(connection1);
+        connection1.start();
+        JmsConnection connection2 = (JmsConnection) factory.createConnection();
+        connection2.setClientID("Test");
+        connection2.start();
+
+        connection1.close();
+        connection2.close();
+    }
+
+    @Test(expected = JMSException.class)
+    public void testSetClientIdAfterStartedFails() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        JmsConnection connection = (JmsConnection) factory.createConnection();
+        connection.setClientID("Test");
+        connection.start();
+        connection.setClientID("NewTest");
+        connection.close();
+    }
+
+    @Test(timeout=30000)
+    public void testCreateConnectionAsSystemAdmin() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        factory.setUsername("system");
+        factory.setPassword("manager");
+        JmsConnection connection = (JmsConnection) factory.createConnection();
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(timeout=30000)
+    public void testCreateConnectionCallSystemAdmin() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        JmsConnection connection = (JmsConnection) factory.createConnection("system", "manager");
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(timeout=30000, expected = JMSSecurityException.class)
+    public void testCreateConnectionAsUnknwonUser() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        factory.setUsername("unknown");
+        factory.setPassword("unknown");
+        JmsConnection connection = (JmsConnection) factory.createConnection();
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(timeout=30000, expected = JMSSecurityException.class)
+    public void testCreateConnectionCallUnknwonUser() throws Exception {
+        JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
+        JmsConnection connection = (JmsConnection) factory.createConnection("unknown", "unknown");
+        assertNotNull(connection);
+        connection.start();
+        connection.close();
+    }
+
+    @Test(timeout=60000)
+    public void testConnectionExceptionBrokerStop() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        Connection connection = createAmqpConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+
+            @Override
+            public void onException(JMSException exception) {
+                latch.countDown();
+            }
+        });
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+
+        stopPrimaryBroker();
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        connection.close();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org