You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gr...@apache.org on 2010/10/13 17:06:27 UTC
svn commit: r1022127 [7/15] - in
/qpid/branches/grkvlt-network-20101013/qpid/java: ./
broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/
broker-plugins/access-control/src/test/java/org/apache/qpid/server/securit...
Added: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/topic/TopicParserTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/topic/TopicParserTest.java?rev=1022127&view=auto
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/topic/TopicParserTest.java (added)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/topic/TopicParserTest.java Wed Oct 13 15:05:29 2010
@@ -0,0 +1,175 @@
+package org.apache.qpid.server.exchange.topic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+/**
+ * This <em>should</em> test the {@link TopicParser}.
+ *
+ * TODO add test case assertions
+ */
+public class TopicParserTest extends QpidTestCase
+{
+ public void _estParser()
+ {
+ printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.*.q.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
+ printMatches(new String[]{
+ "#.a.#",
+ "#.b.#",
+ "#.c.#",
+ "#.d.#",
+ "#.e.#",
+ "#.f.#",
+ "#.g.#",
+ "#.h.#",
+ "#.i.#",
+ "#.j.#",
+ "#.k.#",
+ "#.l.#",
+ "#.m.#",
+ "#.n.#",
+ "#.o.#",
+ "#.p.#",
+ "#.q.#"
+
+ }, "a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
+/*
+ printMatches(new String[]{
+ "#.a.#",
+ "#.b.#",
+ "#.c.#",
+ "#.d.#",
+ "#.e.#",
+ "#.f.#",
+ "#.g.#",
+ "#.h.#",
+ "#.i.#",
+ "#.j.#",
+ "#.k.#",
+ "#.l.#",
+ "#.m.#",
+ "#.n.#",
+ "#.o.#",
+ "#.p.#",
+ "#.q.#",
+ "#.r.#",
+ "#.s.#",
+ "#.t.#",
+ "#.u.#",
+ "#.v.#",
+ "#.w.#",
+ "#.x.#",
+ "#.y.#",
+ "#.z.#"
+
+
+ },"a.b");
+
+ printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
+ printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
+ printMatches("a.#.b.#","a.b.b.b.b.b.b.b.c");
+
+*/
+
+ printMatches("","");
+ printMatches("a","a");
+ printMatches("a","");
+ printMatches("","a");
+ printMatches("a.b","a.b");
+ printMatches("a","a.b");
+ printMatches("a.b","a");
+ printMatches("*","a");
+ printMatches("*.b","a.b");
+ printMatches("*.*","a.b");
+ printMatches("a.*","a.b");
+ printMatches("a.*.#","a.b");
+ printMatches("a.#.b","a.b");
+
+ printMatches("#.b","a");
+ printMatches("#.b","a.b");
+ printMatches("#.a.b","a.b");
+
+ printMatches("#","");
+ printMatches("#","a");
+ printMatches("#","a.b");
+ printMatches("#.#","a.b");
+ printMatches("#.*","a.b");
+
+ printMatches("#.a.b","a.b");
+ printMatches("a.b.#","a.b");
+ printMatches("a.#","a.b");
+ printMatches("#.*.#","a.b");
+ printMatches("#.*.b.#","a.b");
+ printMatches("#.a.*.#","a.b");
+ printMatches("#.a.#.b.#","a.b");
+ printMatches("#.*.#.*.#","a.b");
+ printMatches("*.#.*.#","a.b");
+ printMatches("#.*.#.*","a.b");
+
+ printMatches(new String[]{"a.#.b.#","a.*.#.b.#"},"a.b.b.b.b.b.b.b.c");
+
+ printMatches(new String[]{"a.b", "a.c"},"a.b");
+ printMatches(new String[]{"a.#", "a.c", "#.b"},"a.b");
+ printMatches(new String[]{"a.#", "a.c", "#.b", "#", "*.*"},"a.b");
+
+ printMatches(new String[]{"a.b.c.d.e.#", "a.b.c.d.#", "a.b.c.d.*", "a.b.c.#", "#.e", "a.*.c.d.e","#.c.*.#.*.*"},"a.b.c.d.e");
+ printMatches(new String[]{"a.b.c.d.e.#", "a.b.c.d.#", "a.b.c.d.*", "a.b.c.#", "#.e", "a.*.c.d.e","#.c.*.#.*.*"},"a.b.c.d.f.g");
+ }
+
+ private void printMatches(final String[] bindingKeys, final String routingKey)
+ {
+ TopicMatcherDFAState sm = null;
+ Map<TopicMatcherResult, String> resultMap = new HashMap<TopicMatcherResult, String>();
+
+ TopicParser parser = new TopicParser();
+
+ long start = System.currentTimeMillis();
+ for(int i = 0; i < bindingKeys.length; i++)
+ {
+ _logger.debug((System.currentTimeMillis() - start) + ":\t" + bindingKeys[i]);
+ TopicMatcherResult r = new TopicMatcherResult(){};
+ resultMap.put(r, bindingKeys[i]);
+ AMQShortString bindingKeyShortString = new AMQShortString(bindingKeys[i]);
+
+ _logger.info("=====================================================");
+ _logger.info("Adding binding key: " + bindingKeyShortString);
+ _logger.info("-----------------------------------------------------");
+
+
+ if(i==0)
+ {
+ sm = parser.createStateMachine(bindingKeyShortString, r);
+ }
+ else
+ {
+ sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeyShortString, r));
+ }
+ _logger.debug(sm.reachableStates());
+ _logger.debug("=====================================================");
+ }
+ AMQShortString routingKeyShortString = new AMQShortString(routingKey);
+
+ Collection<TopicMatcherResult> results = sm.parse(parser.getDictionary(), routingKeyShortString);
+ Collection<String> resultStrings = new ArrayList<String>();
+
+ for(TopicMatcherResult result : results)
+ {
+ resultStrings.add(resultMap.get(result));
+ }
+
+ final ArrayList<String> nonMatches = new ArrayList<String>(Arrays.asList(bindingKeys));
+ nonMatches.removeAll(resultStrings);
+ _logger.info("\""+routingKeyShortString+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches);
+ }
+
+ private void printMatches(String bindingKey, String routingKey)
+ {
+ printMatches(new String[] { bindingKey }, routingKey);
+ }
+}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/Log4jMessageLoggerTest.java Wed Oct 13 15:05:29 2010
@@ -20,25 +20,25 @@
*/
package org.apache.qpid.server.logging;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import junit.framework.TestCase;
-
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.test.utils.QpidTestCase;
-/** Test that the Log4jMessageLogger defaults behave as expected */
-public class Log4jMessageLoggerTest extends TestCase
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Test that the Log4jMessageLogger defaults behave as expected
+ */
+public class Log4jMessageLoggerTest extends QpidTestCase
{
Level _rootLevel;
Log4jTestAppender _appender;
- @Override
public void setUp() throws IOException
{
// Setup a file for logging
@@ -56,7 +56,6 @@ public class Log4jMessageLoggerTest exte
root.warn("Adding Test Appender:" + _appender);
}
- @Override
public void tearDown()
{
Logger root = Logger.getRootLogger();
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java Wed Oct 13 15:05:29 2010
@@ -228,8 +228,7 @@ public class CurrentActorTest extends Ba
try
{
- AMQPConnectionActor actor = new AMQPConnectionActor(getSession(),
- new NullRootMessageLogger());
+ AMQPConnectionActor actor = new AMQPConnectionActor(getSession(), new NullRootMessageLogger());
CurrentActor.set(actor);
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java Wed Oct 13 15:05:29 2010
@@ -40,7 +40,7 @@ public class ConnectionLogSubjectTest ex
*/
protected void validateLogStatement(String message)
{
- verifyConnection(getSession().getSessionID(), "InternalTestProtocolSession", "127.0.0.1:1", "test", message);
+ verifyConnection(getSession().getSessionID(), "InternalTestProtocolSession", "127.0.0.1:12345", "test", message);
}
}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Wed Oct 13 15:05:29 2010
@@ -20,24 +20,30 @@
*/
package org.apache.qpid.server.protocol;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.security.Principal;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.transport.TestNetworkDriver;
public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
{
@@ -47,7 +53,7 @@ public class InternalTestProtocolSession
public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException
{
- super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkDriver());
+ super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), null, null, null, 0);
_channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
@@ -63,11 +69,33 @@ public class InternalTestProtocolSession
setVirtualHost(virtualHost);
}
+ public void closeProtocolSession()
+ {
+ try
+ {
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
+ catch (AMQException e)
+ {
+ _logger.info(e.getMessage());
+ }
+ }
+
+ public void writeFrame(AMQDataBlock frame)
+ {
+ _logger.info("writing frame " + frame.getSize() + " bytes");
+ }
+
public ProtocolOutputConverter getProtocolOutputConverter()
{
return this;
}
+ public SocketAddress getRemoteAddress()
+ {
+ return new InetSocketAddress("localhost", 12345);
+ }
+
public byte getProtocolMajorVersion()
{
return (byte) 8;
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Wed Oct 13 15:05:29 2010
@@ -418,11 +418,6 @@ public class AMQQueueMBeanTest extends I
_queueMBean = new AMQQueueMBean(getQueue());
}
- public void tearDown()
- {
- ApplicationRegistry.remove();
- }
-
private void sendMessages(int messageCount, boolean persistent) throws AMQException
{
for (int i = 0; i < messageCount; i++)
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Wed Oct 13 15:05:29 2010
@@ -309,7 +309,7 @@ public class AckTest extends InternalBro
}
}
- /**
+ /**
* A regression fixing QPID-1136 showed this up
*
* @throws Exception
@@ -420,8 +420,4 @@ public class AckTest extends InternalBro
}
*/
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(AckTest.class);
- }
}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Wed Oct 13 15:05:29 2010
@@ -55,7 +55,6 @@ public class SimpleAMQQueueTest extends
{
protected SimpleAMQQueue _queue;
- protected VirtualHost _virtualHost;
protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore();
protected AMQShortString _qname = new AMQShortString("qname");
protected AMQShortString _owner = new AMQShortString("owner");
@@ -97,22 +96,17 @@ public class SimpleAMQQueueTest extends
public void setUp() throws Exception
{
super.setUp();
- //Create Application Registry for test
- ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance();
- PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), env), _store);
- applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, getVirtualHost(), _arguments);
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, _virtualHost, _arguments);
-
- _exchange = (DirectExchange)_virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ _exchange = (DirectExchange) getVirtualHost().getExchangeRegistry().getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
}
@Override
public void tearDown() throws Exception
{
_queue.stop();
+
super.tearDown();
}
@@ -120,7 +114,7 @@ public class SimpleAMQQueueTest extends
{
_queue.stop();
try {
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, false, _virtualHost, _arguments );
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, false, getVirtualHost(), _arguments );
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
@@ -140,18 +134,18 @@ public class SimpleAMQQueueTest extends
}
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
- false, _virtualHost, _arguments);
+ false, getVirtualHost(), _arguments);
assertNotNull("Queue was not created", _queue);
}
public void testGetVirtualHost()
{
- assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
+ assertEquals("Virtual host was wrong", getVirtualHost(), _queue.getVirtualHost());
}
public void testBinding() throws AMQSecurityException, AMQInternalException
{
- _virtualHost.getBindingFactory().addBinding(String.valueOf(_routingKey), _queue, _exchange, Collections.EMPTY_MAP);
+ getVirtualHost().getBindingFactory().addBinding(String.valueOf(_routingKey), _queue, _exchange, Collections.EMPTY_MAP);
assertTrue("Routing key was not bound",
_exchange.isBound(_routingKey));
@@ -164,7 +158,7 @@ public class SimpleAMQQueueTest extends
assertEquals("Wrong exchange bound", _exchange,
_queue.getBindings().get(0).getExchange());
- _virtualHost.getBindingFactory().removeBinding(String.valueOf(_routingKey), _queue, _exchange, Collections.EMPTY_MAP);
+ getVirtualHost().getBindingFactory().removeBinding(String.valueOf(_routingKey), _queue, _exchange, Collections.EMPTY_MAP);
assertFalse("Routing key was still bound",
_exchange.isBound(_routingKey));
@@ -255,7 +249,7 @@ public class SimpleAMQQueueTest extends
public void testAutoDeleteQueue() throws Exception
{
_queue.stop();
- _queue = new SimpleAMQQueue(_qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP);
+ _queue = new SimpleAMQQueue(_qname, false, null, true, false, getVirtualHost(), Collections.EMPTY_MAP);
_queue.setDeleteOnNoConsumers(true);
_queue.registerSubscription(_subscription, false);
AMQMessage message = createMessage(new Long(25));
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java Wed Oct 13 15:05:29 2010
@@ -37,23 +37,16 @@ public class SimpleAMQQueueThreadPoolTes
int initialCount = ReferenceCountingExecutorService.getInstance().getReferenceCount();
VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
- try
- {
- SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false,
- new AMQShortString("owner"),
- false, false, test, null);
+ SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false,
+ new AMQShortString("owner"),
+ false, false, test, null);
- assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
+ assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
- assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount());
+ assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount());
- queue.stop();
+ queue.stop();
- assertEquals("References not decreased", initialCount , ReferenceCountingExecutorService.getInstance().getReferenceCount());
- }
- finally
- {
- ApplicationRegistry.remove();
- }
+ assertEquals("References not decreased", initialCount , ReferenceCountingExecutorService.getInstance().getReferenceCount());
}
}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java Wed Oct 13 15:05:29 2010
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.server.registry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-
-import java.security.Security;
import java.security.Provider;
+import java.security.Security;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-import java.util.LinkedList;
+
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
/**
* QPID-1390 : Test to validate that the AuthenticationManger can successfully unregister any new SASL providers when
@@ -35,71 +36,51 @@ import java.util.LinkedList;
*/
public class ApplicationRegistryShutdownTest extends InternalBrokerBaseCase
{
+ List<Provider> _before;
- Provider[] _defaultProviders;
@Override
public void setUp() throws Exception
{
// Get default providers
- _defaultProviders = Security.getProviders();
+ _before = Arrays.asList(Security.getProviders());
//Startup the new broker and register the new providers
super.setUp();
}
-
/**
* QPID-1399 : Ensure that the Authentiction manager unregisters any SASL providers created during
* ApplicationRegistry initialisation.
- *
*/
public void testAuthenticationMangerCleansUp() throws Exception
{
-
// Get the providers after initialisation
- Provider[] providersAfterInitialisation = Security.getProviders();
+ List<Provider> after = Arrays.asList(Security.getProviders());
// Find the additions
- List additions = new LinkedList();
- for (Provider afterInit : providersAfterInitialisation)
+ List<Provider> additions = new ArrayList<Provider>();
+ for (Provider provider : after)
{
- boolean found = false;
- for (Provider defaultProvider : _defaultProviders)
+ if (!_before.contains(provider))
{
- if (defaultProvider == afterInit)
- {
- found=true;
- break;
- }
- }
-
- // Record added registies
- if (!found)
- {
- additions.add(afterInit);
+ additions.add(provider);
}
}
- // Not using isEmpty as that is not in Java 5
- assertTrue("No new SASL mechanisms added by initialisation.", additions.size() != 0 );
+ assertFalse("No new SASL mechanisms added by initialisation.", additions.isEmpty());
//Close the registry which will perform the close the AuthenticationManager
getRegistry().close();
- //Validate that the SASL plugFins have been removed.
- Provider[] providersAfterClose = Security.getProviders();
+ //Validate that the SASL plugins have been removed.
+ List<Provider> closed = Arrays.asList(Security.getProviders());
- assertTrue("No providers unregistered", providersAfterInitialisation.length > providersAfterClose.length);
+ assertEquals("No providers unregistered", _before.size(), closed.size());
//Ensure that the additions are not still present after close().
- for (Provider afterClose : providersAfterClose)
+ for (Provider provider : closed)
{
- assertFalse("Added provider not unregistered", additions.contains(afterClose));
+ assertFalse("Added provider not unregistered: " + provider.getName(), additions.contains(provider));
}
}
-
-
-
-
-
}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabaseTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabaseTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabaseTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabaseTest.java Wed Oct 13 15:05:29 2010
@@ -20,14 +20,6 @@
*/
package org.apache.qpid.server.security.auth.database;
-import junit.framework.TestCase;
-
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.login.AccountNotFoundException;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -42,7 +34,14 @@ import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
-public class Base64MD5PasswordFilePrincipalDatabaseTest extends TestCase
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.login.AccountNotFoundException;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class Base64MD5PasswordFilePrincipalDatabaseTest extends QpidTestCase
{
private static final String TEST_COMMENT = "# Test Comment";
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/HashedUserTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/HashedUserTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/HashedUserTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/HashedUserTest.java Wed Oct 13 15:05:29 2010
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.server.security.auth.database;
-import junit.framework.TestCase;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
import java.io.UnsupportedEncodingException;
+import org.apache.qpid.test.utils.QpidTestCase;
+
/*
Note User is mainly tested by Base64MD5PFPDTest this is just to catch the extra methods
*/
-public class HashedUserTest extends TestCase
+public class HashedUserTest extends QpidTestCase
{
String USERNAME = "username";
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabaseTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabaseTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabaseTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabaseTest.java Wed Oct 13 15:05:29 2010
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.server.security.auth.database;
-import junit.framework.TestCase;
-
-import javax.security.auth.login.AccountNotFoundException;
-
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -38,7 +32,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
-public class PlainPasswordFilePrincipalDatabaseTest extends TestCase
+import javax.security.auth.login.AccountNotFoundException;
+
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class PlainPasswordFilePrincipalDatabaseTest extends QpidTestCase
{
private static final String TEST_COMMENT = "# Test Comment";
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/PlainUserTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/PlainUserTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/PlainUserTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/PlainUserTest.java Wed Oct 13 15:05:29 2010
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.server.security.auth.database;
-import junit.framework.TestCase;
+import org.apache.qpid.test.utils.QpidTestCase;
/*
Note PlainUser is mainly tested by PlainPFPDTest, this is just to catch the extra methods
*/
-public class PlainUserTest extends TestCase
+public class PlainUserTest extends QpidTestCase
{
String USERNAME = "username";
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java Wed Oct 13 15:05:29 2010
@@ -31,10 +31,9 @@ import javax.security.auth.Subject;
import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase;
+import org.apache.qpid.test.utils.QpidTestCase;
-import junit.framework.TestCase;
-
-public class RMIPasswordAuthenticatorTest extends TestCase
+public class RMIPasswordAuthenticatorTest extends QpidTestCase
{
private final String USERNAME = "guest";
private final String PASSWORD = "guest";
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/CRAMMD5HexInitialiserTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/CRAMMD5HexInitialiserTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/CRAMMD5HexInitialiserTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/CRAMMD5HexInitialiserTest.java Wed Oct 13 15:05:29 2010
@@ -20,23 +20,24 @@
*/
package org.apache.qpid.server.security.auth.sasl;
-import junit.framework.TestCase;
-import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabase;
-import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HexInitialiser;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Properties;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Properties;
+
+import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabase;
+import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HexInitialiser;
+import org.apache.qpid.test.utils.QpidTestCase;
/**
* These tests ensure that the Hex wrapping that the initialiser performs does actually operate when the handle method is called.
*/
-public class CRAMMD5HexInitialiserTest extends TestCase
+public class CRAMMD5HexInitialiserTest extends QpidTestCase
{
public void testHex()
{
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/SaslServerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/SaslServerTestCase.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/SaslServerTestCase.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/SaslServerTestCase.java Wed Oct 13 15:05:29 2010
@@ -25,10 +25,9 @@ import javax.security.sasl.SaslException
import javax.security.sasl.SaslServer;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.test.utils.QpidTestCase;
-import junit.framework.TestCase;
-
-public abstract class SaslServerTestCase extends TestCase
+public abstract class SaslServerTestCase extends QpidTestCase
{
protected SaslServer server;
protected String username = "u";
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Wed Oct 13 15:05:29 2010
@@ -21,11 +21,12 @@
package org.apache.qpid.server.store;
import java.io.File;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
@@ -35,6 +36,7 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.Exchange;
@@ -55,6 +57,7 @@ import org.apache.qpid.server.registry.A
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.util.FileUtils;
@@ -66,43 +69,46 @@ import org.apache.qpid.util.FileUtils;
*/
public class MessageStoreTest extends InternalBrokerBaseCase
{
- public static final int DEFAULT_PRIORTY_LEVEL = 5;
- public static final String SELECTOR_VALUE = "Test = 'MST'";
- public static final String LVQ_KEY = "MST-LVQ-KEY";
-
- AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange");
- AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange");
- AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange");
-
- AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable");
- AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable");
- AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue");
- AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue");
-
- AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive");
- AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
- AMQShortString durableLastValueQueueName = new AMQShortString("MST-LastValueQueue-Durable");
- AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
- AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
- AMQShortString queueName = new AMQShortString("MST-Queue");
+ protected static final int DEFAULT_PRIORTY_LEVEL = 5;
+ protected static final String SELECTOR_VALUE = "Test = 'MST'";
+ protected static final String LVQ_KEY = "MST-LVQ-KEY";
+
+ protected static final AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange");
+ protected static final AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange");
+ protected static final AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange");
+
+ protected static final AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable");
+ protected static final AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable");
+ protected static final AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue");
+ protected static final AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue");
+
+ protected static final AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive");
+ protected static final AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
+ protected static final AMQShortString durableLastValueQueueName = new AMQShortString("MST-LastValueQueue-Durable");
+ protected static final AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
+ protected static final AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
+ protected static final AMQShortString queueName = new AMQShortString("MST-Queue");
- AMQShortString directRouting = new AMQShortString("MST-direct");
- AMQShortString topicRouting = new AMQShortString("MST-topic");
-
- AMQShortString queueOwner = new AMQShortString("MST");
-
- protected PropertiesConfiguration _config;
+ protected static final AMQShortString directRouting = new AMQShortString("MST-direct");
+ protected static final AMQShortString topicRouting = new AMQShortString("MST-topic");
+ protected static final AMQShortString queueOwner = new AMQShortString("MST");
+
+ protected XMLConfiguration _xml = new XMLConfiguration();
+ protected ServerConfiguration _config;
+
public void setUp() throws Exception
{
- super.setUp();
-
String storePath = System.getProperty("QPID_WORK") + "/" + getName();
- _config = new PropertiesConfiguration();
- _config.addProperty("store.class", getTestProfileMessageStoreClassName());
- _config.addProperty("store.environment-path", storePath);
-
+ _xml.addProperty("store.class", getTestProfileMessageStoreClassName());
+ _xml.addProperty("store.environment-path", storePath);
+
+ _config = new ServerConfiguration(_xml);
+ TestApplicationRegistry instance = new TestApplicationRegistry(_config);
+ ApplicationRegistry.initialise(instance);
+ _config.initialise();
+
cleanup(new File(storePath));
reloadVirtualHost();
@@ -117,18 +123,19 @@ public class MessageStoreTest extends In
try
{
getVirtualHost().close();
- getVirtualHost().getApplicationRegistry().
- getVirtualHostRegistry().unregisterVirtualHost(getVirtualHost());
+ getVirtualHost().getApplicationRegistry().getVirtualHostRegistry().unregisterVirtualHost(getVirtualHost());
}
catch (Exception e)
{
+ e.printStackTrace();
+ _logger.error("reload vhost fail", e);
fail(e.getMessage());
}
}
try
{
- setVirtualHost(ApplicationRegistry.getInstance().createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config)));
+ setVirtualHost(ApplicationRegistry.getInstance().createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _xml)));
}
catch (Exception e)
{
@@ -741,7 +748,7 @@ public class MessageStoreTest extends In
try
{
- exchange = type.newInstance(getVirtualHost(), name, durable, 0, false);
+ exchange = type.newInstance(getVirtualHost(), name, durable, 0, false, Collections.EMPTY_MAP);
}
catch (AMQException e)
{
@@ -905,4 +912,4 @@ public class MessageStoreTest extends In
return _routingKey;
}
}
-}
\ No newline at end of file
+}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Wed Oct 13 15:05:29 2010
@@ -29,13 +29,15 @@ import org.apache.qpid.framing.BasicCont
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.protocol.ReceiverFactory;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.SystemOutMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.protocol.BrokerReceiverFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -44,9 +46,11 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.util.MockChannel;
-
public class InternalBrokerBaseCase extends QpidTestCase
{
private IApplicationRegistry _registry;
@@ -59,11 +63,10 @@ public class InternalBrokerBaseCase exte
private ServerConfiguration _configuration;
private XMLConfiguration _configXml = new XMLConfiguration();
private boolean _started = false;
+ private IncomingNetworkTransport _transport;
public void setUp() throws Exception
{
- super.setUp();
-
_configXml.addProperty("virtualhosts.virtualhost.name", "test");
_configXml.addProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
@@ -84,6 +87,7 @@ public class InternalBrokerBaseCase exte
_registry = new TestApplicationRegistry(_configuration);
ApplicationRegistry.initialise(_registry);
+
_registry.getVirtualHostRegistry().setDefaultVirtualHostName(getName());
_virtualHost = _registry.getVirtualHostRegistry().getVirtualHost(getName());
@@ -108,12 +112,20 @@ public class InternalBrokerBaseCase exte
_virtualHost.getBindingFactory().addBinding(getName(), _queue, defaultExchange, null);
- _session = new InternalTestProtocolSession(_virtualHost);
- CurrentActor.set(_session.getLogActor());
+ setSession(new InternalTestProtocolSession(_virtualHost));
+ CurrentActor.set(getSession().getLogActor());
- _channel = new MockChannel(_session, 1, _messageStore);
+ _channel = new MockChannel(getSession(), 1, _messageStore);
- _session.addChannel(_channel);
+ getSession().addChannel(_channel);
+
+ ConnectionSettings settings = new ConnectionSettings();
+ settings.setProtocol("vm");
+ settings.setPort(1);
+
+ _transport = Transport.getIncomingTransport();
+ ReceiverFactory factory = new BrokerReceiverFactory();
+ _transport.accept(settings, factory, null);
}
protected void configure()
@@ -123,10 +135,11 @@ public class InternalBrokerBaseCase exte
protected void stopBroker()
{
+ //Remove the ProtocolSession Actor added during createBroker
+ CurrentActor.remove();
try
{
- //Remove the ProtocolSession Actor added during createBroker
- CurrentActor.remove();
+ _transport.close();
}
finally
{
@@ -138,21 +151,9 @@ public class InternalBrokerBaseCase exte
public void tearDown() throws Exception
{
- try
+ if (_started)
{
- if (_started)
- {
- stopBroker();
- }
- }
- finally
- {
- super.tearDown();
- // Purge Any erroneously added actors
- while (CurrentActor.get() != null)
- {
- CurrentActor.remove();
- }
+ stopBroker();
}
}
@@ -250,7 +251,6 @@ public class InternalBrokerBaseCase exte
channel.publishContentHeader(_headerBody);
}
-
}
public void acknowledge(AMQChannel channel, long deliveryTag)
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java Wed Oct 13 15:05:29 2010
@@ -27,7 +27,6 @@ import org.apache.qpid.server.security.a
import java.util.Properties;
-
public class TestApplicationRegistry extends ApplicationRegistry
{
public TestApplicationRegistry(ServerConfiguration config) throws ConfigurationException
@@ -42,7 +41,6 @@ public class TestApplicationRegistry ext
users.put("admin","admin");
_databaseManager = new PropertiesPrincipalDatabaseManager("testPasswordFile", users);
}
-
}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/build.deps
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/build.deps?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/build.deps (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/build.deps Wed Oct 13 15:05:29 2010
@@ -1,5 +1,3 @@
-backport-util-concurrent=lib/backport-util-concurrent-2.2.jar
-
commons-beanutils-core=lib/commons-beanutils-core-1.8.0.jar
commons-cli=lib/commons-cli-1.0.jar
commons-codec=lib/commons-codec-1.3.jar
@@ -21,8 +19,8 @@ jline=lib/jline-0.9.94.jar
log4j=lib/log4j-1.2.12.jar
-mina-core=lib/mina-core-1.0.1.jar
-mina-filter-ssl=lib/mina-filter-ssl-1.0.1.jar
+mina-core=lib/mina-core-1.1.7.jar
+mina-filter-ssl=lib/mina-filter-ssl-1.1.7.jar
slf4j-api=lib/slf4j-api-1.4.0.jar
slf4j-log4j=lib/slf4j-log4j12-1.4.0.jar
@@ -78,7 +76,7 @@ felix.libs=${osgi-core} ${felix-framewor
commons-configuration.libs = ${commons-beanutils-core} ${commons-digester} \
${commons-codec} ${commons-lang} ${commons-collections} ${commons-configuration}
-common.libs=${slf4j-api} ${backport-util-concurrent} ${mina-core} ${mina-filter-ssl}
+common.libs=${slf4j-api} ${mina-core} ${mina-filter-ssl}
client.libs=${geronimo-jms}
tools.libs=${commons-configuration.libs}
broker.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java Wed Oct 13 15:05:29 2010
@@ -50,7 +50,7 @@ public class ConnectionSetup
final static String QUEUE_NAME = "example.MyQueue";
public static final String TOPIC_JNDI_NAME = "topic";
- final static String TOPIC_NAME = "example.hierarical.topic";
+ final static String TOPIC_NAME = "usa.news";
private Context _ctx;
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java Wed Oct 13 15:05:29 2010
@@ -71,7 +71,7 @@ public class Publisher extends Client
public static void main(String[] args)
{
- String destination = args.length > 2 ? args[1] : null;
+ String destination = args.length > 2 ? args[1] : "usa.news";
int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100;
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java Wed Oct 13 15:05:29 2010
@@ -21,9 +21,9 @@
package org.apache.qpid.example.transport;
+import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.url.URLSyntaxException;
@@ -83,7 +83,7 @@ public class ExistingSocketConnectorDemo
Socket socket = SocketChannel.open().socket();
socket.connect(new InetSocketAddress("localhost", 5672));
- TransportConnection.registerOpenSocket(Socket1_ID, socket);
+ ExistingSocketConnector.registerOpenSocket(Socket1_ID, socket);
_connection = new AMQConnection(CONNECTION);
@@ -140,7 +140,7 @@ public class ExistingSocketConnectorDemo
socket.connect(new InetSocketAddress("localhost", 5673));
// This is the new method to pass in an open socket for the connection to use.
- TransportConnection.registerOpenSocket(Socket2_ID, socket);
+ ExistingSocketConnector.registerOpenSocket(Socket2_ID, socket);
}
catch (IOException e)
{
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java Wed Oct 13 15:05:29 2010
@@ -24,19 +24,38 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
/**
- * AMQAuthenticationException represents all failures to authenticate access to a broker.
+ * Authentication Exception represents all failures to authenticate access to a broker.
+ *
+ * This exception encapsulates error code 530, or {@link AMQConstant#NOT_ALLOWED}
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Represent failure to authenticate the client.
* </table>
- *
- * @todo Will this alwyas have the same status code, NOT_ALLOWED 530? Might set this up to always use that code.
*/
public class AMQAuthenticationException extends AMQException
{
- public AMQAuthenticationException(AMQConstant error, String msg, Throwable cause)
+ /** serialVersionUID */
+ private static final long serialVersionUID = 6045925435200184200L;
+
+ /**
+ * Creates an exception with an optional message and optional underlying cause.
+ *
+ * @param msg The exception message. May be null if not to be set.
+ * @param cause The underlying cause of the exception. May be null if not to be set.
+ */
+ public AMQAuthenticationException(String msg, Throwable cause)
+ {
+ super(AMQConstant.NOT_ALLOWED, ((msg == null) ? "Authentication error" : msg), cause);
+ }
+
+ public AMQAuthenticationException(String msg)
+ {
+ this(msg, null);
+ }
+
+ public AMQAuthenticationException()
{
- super(error, msg, cause);
+ this(null);
}
}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Wed Oct 13 15:05:29 2010
@@ -58,6 +58,8 @@ public class AMQBrokerDetails implements
//todo this list of valid transports should be enumerated somewhere
if ((!(transport.equalsIgnoreCase(BrokerDetails.VM) ||
transport.equalsIgnoreCase(BrokerDetails.TCP) ||
+ transport.equalsIgnoreCase(BrokerDetails.UDP) ||
+ transport.equalsIgnoreCase(BrokerDetails.MULTICAST) ||
transport.equalsIgnoreCase(BrokerDetails.SOCKET))))
{
if (transport.equalsIgnoreCase("localhost"))
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Oct 13 15:05:29 2010
@@ -384,10 +384,10 @@ public class AMQConnection extends Close
this(new AMQConnectionURL(
useSSL
? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
- + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ "'" + "," + BrokerDetails.OPTIONS_SSL + "='true'")
: (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
- + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ "'" + "," + BrokerDetails.OPTIONS_SSL + "='false'")), sslConfig);
}
@@ -473,7 +473,7 @@ public class AMQConnection extends Close
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
- if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion))
+ if ("0-8".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
}
@@ -1010,9 +1010,15 @@ public class AMQConnection extends Close
if (!_closed.getAndSet(true))
{
_closing.set(true);
- try{
- doClose(sessions, timeout);
- }finally{
+ try
+ {
+ synchronized (getFailoverMutex())
+ {
+ doClose(sessions, timeout);
+ }
+ }
+ finally
+ {
_closing.set(false);
}
}
@@ -1032,56 +1038,53 @@ public class AMQConnection extends Close
}
else
{
- synchronized (getFailoverMutex())
+ try
{
- try
- {
- long startCloseTime = System.currentTimeMillis();
+ long startCloseTime = System.currentTimeMillis();
- closeAllSessions(null, timeout, startCloseTime);
+ closeAllSessions(null, timeout, startCloseTime);
- //This MUST occur after we have successfully closed all Channels/Sessions
- _taskPool.shutdown();
+ //This MUST occur after we have successfully closed all Channels/Sessions
+ _taskPool.shutdown();
- if (!_taskPool.isTerminated())
+ if (!_taskPool.isTerminated())
+ {
+ try
{
- try
- {
- // adjust timeout
- long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
-
- _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- _logger.info("Interrupted while shutting down connection thread pool.");
- }
- }
+ // adjust timeout
+ long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
- // adjust timeout
- timeout = adjustTimeout(timeout, startCloseTime);
- _delegate.closeConnection(timeout);
-
- //If the taskpool hasn't shutdown by now then give it shutdownNow.
- // This will interupt any running tasks.
- if (!_taskPool.isTerminated())
+ _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
{
- List<Runnable> tasks = _taskPool.shutdownNow();
- for (Runnable r : tasks)
- {
- _logger.warn("Connection close forced taskpool to prevent execution:" + r);
- }
+ _logger.info("Interrupted while shutting down connection thread pool.");
}
}
- catch (AMQException e)
+
+ // adjust timeout
+ timeout = adjustTimeout(timeout, startCloseTime);
+ _delegate.closeConnection(timeout);
+
+ //If the taskpool hasn't shutdown by now then give it shutdownNow.
+ // This will interupt any running tasks.
+ if (!_taskPool.isTerminated())
{
- _logger.error("error:", e);
- JMSException jmse = new JMSException("Error closing connection: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
+ List<Runnable> tasks = _taskPool.shutdownNow();
+ for (Runnable r : tasks)
+ {
+ _logger.warn("Connection close forced taskpool to prevent execution:" + r);
+ }
}
}
+ catch (AMQException e)
+ {
+ _logger.error("error:", e);
+ JMSException jmse = new JMSException("Error closing connection: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
}
}
}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Wed Oct 13 15:05:29 2010
@@ -31,6 +31,7 @@ import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.XASession;
+import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -39,6 +40,7 @@ import org.apache.qpid.framing.ProtocolV
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionClose;
import org.apache.qpid.transport.ConnectionException;
@@ -83,23 +85,23 @@ public class AMQConnectionDelegate_0_10
{
_conn.checkNotClosed();
int channelId = _conn.getNextChannelID();
- AMQSession session;
try
{
- session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
+ AMQSession session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
prefetchLow);
_conn.registerSession(channelId, session);
if (_conn._started)
{
session.start();
}
+
+ return session;
}
catch (Exception e)
{
_logger.error("exception creating session:", e);
throw new JMSAMQException("cannot create session", e);
}
- return session;
}
/**
@@ -153,15 +155,21 @@ public class AMQConnectionDelegate_0_10
if (_logger.isDebugEnabled())
{
_logger.debug("connecting to host: " + brokerDetail.getHost()
- + " port: " + brokerDetail.getPort() + " vhost: "
- + _conn.getVirtualHost() + " username: "
- + _conn.getUsername() + " password: "
- + _conn.getPassword());
+ + " protocol: " + brokerDetail.getTransport()
+ + " port: " + brokerDetail.getPort()
+ + " vhost: " + _conn.getVirtualHost()
+ + " username: " + _conn.getUsername()
+ + " password: " + _conn.getPassword());
}
- ConnectionSettings conSettings = new ConnectionSettings();
- retriveConnectionSettings(conSettings,brokerDetail);
- _qpidConnection.connect(conSettings);
+ ConnectionSettings conSettings = retriveConnectionSettings(brokerDetail);
+ SSLConfiguration sslConfig = _conn.getSSLConfiguration();
+ SSLContextFactory sslFactory = null;
+ if (sslConfig != null)
+ {
+ sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+ }
+ _qpidConnection.connect(conSettings, sslFactory);
_conn._connected = true;
_conn.setUsername(_qpidConnection.getUserID());
@@ -173,7 +181,7 @@ public class AMQConnectionDelegate_0_10
}
catch (ConnectionException ce)
{
- AMQConstant code = AMQConstant.REPLY_SUCCESS;
+ AMQConstant code = AMQConstant.CHANNEL_ERROR;
if (ce.getClose() != null && ce.getClose().getReplyCode() != null)
{
code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue());
@@ -198,6 +206,7 @@ public class AMQConnectionDelegate_0_10
{
List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
_logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size()));
+ _qpidConnection.resume();
for (AMQSession s : sessions)
{
((AMQSession_0_10) s)._qpidConnection = _qpidConnection;
@@ -259,24 +268,7 @@ public class AMQConnectionDelegate_0_10
}
}
- ExceptionListener listener = _conn._exceptionListener;
- if (listener == null)
- {
- _logger.error("connection exception: " + conn, exc);
- }
- else
- {
- String code = null;
- if (close != null)
- {
- code = close.getReplyCode().toString();
- }
-
- JMSException ex = new JMSException(exc.getMessage(), code);
- ex.setLinkedException(exc);
- ex.initCause(exc);
- listener.onException(ex);
- }
+ _conn.exceptionReceived(new AMQDisconnectedException("Server closed connection and reconnection not permitted."));
}
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
@@ -301,14 +293,15 @@ public class AMQConnectionDelegate_0_10
return ProtocolVersion.v0_10;
}
- private void retriveConnectionSettings(ConnectionSettings conSettings, BrokerDetails brokerDetail)
+ private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail)
{
-
+ ConnectionSettings conSettings = new ConnectionSettings();
conSettings.setHost(brokerDetail.getHost());
conSettings.setPort(brokerDetail.getPort());
conSettings.setVhost(_conn.getVirtualHost());
conSettings.setUsername(_conn.getUsername());
conSettings.setPassword(_conn.getPassword());
+ conSettings.setProtocol(brokerDetail.getTransport());
// ------------ sasl options ---------------
if (brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS) != null)
@@ -386,11 +379,12 @@ public class AMQConnectionDelegate_0_10
if (brokerDetail.getProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY) != null)
{
- conSettings.setTcpNodelay(
- brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY));
+ conSettings.setTcpNodelay(brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY));
}
conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail));
+
+ return conSettings;
}
// The idle_timeout prop is in milisecs while
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Wed Oct 13 15:05:29 2010
@@ -38,7 +38,6 @@ import org.apache.qpid.client.failover.F
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.StateWaiter;
-import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
@@ -48,6 +47,12 @@ import org.apache.qpid.framing.TxSelectB
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.NetworkTransport;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,25 +90,30 @@ public class AMQConnectionDelegate_8_0 i
final Set<AMQState> openOrClosedStates =
EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
-
StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
- // TODO: use system property thingy for this
- if (System.getProperty("UseTransportIo", "false").equals("false"))
- {
- TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
- }
- else
+ ConnectionSettings settings = new ConnectionSettings();
+ settings.setHost(brokerDetail.getHost());
+ settings.setPort(brokerDetail.getPort());
+ settings.setProtocol(brokerDetail.getTransport());
+
+ SSLConfiguration sslConfig = _conn.getSSLConfiguration();
+ SSLContextFactory sslFactory = null;
+ if (sslConfig != null)
{
- _conn.getProtocolHandler().createIoTransportSession(brokerDetail);
+ sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
}
+
+ OutgoingNetworkTransport transport = Transport.getOutgoingTransport();
+ NetworkConnection network = transport.connect(settings, _conn._protocolHandler, sslFactory);
+ _conn._protocolHandler.connect(transport, network);
_conn._protocolHandler.getProtocolSession().init();
+
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
-
AMQState state = waiter.await();
- if(state == AMQState.CONNECTION_OPEN)
+ if (state == AMQState.CONNECTION_OPEN)
{
_conn._failoverPolicy.attainedConnection();
_conn._connected = true;
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Wed Oct 13 15:05:29 2010
@@ -567,7 +567,7 @@ public abstract class AMQDestination imp
{
return true;
}
- if (o == null || getClass() != o.getClass())
+ if (o == null || !(getClass().isAssignableFrom(o.getClass()) || o.getClass().isAssignableFrom(getClass())))
{
return false;
}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Oct 13 15:05:29 2010
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -303,12 +304,8 @@ public abstract class AMQSession<C exten
protected final Lock _subscriberDetails = new ReentrantLock(true);
protected final Lock _subscriberAccess = new ReentrantLock(true);
- /**
- * Used to hold incoming messages.
- *
- * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
- */
- protected final FlowControllingBlockingQueue _queue;
+ /** Used to hold incoming messages. */
+ protected final BlockingQueue<Dispatchable> _queue;
/** Holds the highest received delivery tag. */
private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
@@ -463,8 +460,7 @@ public abstract class AMQSession<C exten
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
- _queue =
- new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark,
+ _queue = new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, _prefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
private final AtomicBoolean _suspendState = new AtomicBoolean();
@@ -515,7 +511,7 @@ public abstract class AMQSession<C exten
}
else
{
- _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
+ _queue = new FlowControllingBlockingQueue<Dispatchable>();
}
// Add creation logging to tie in with the existing close logging
@@ -704,16 +700,16 @@ public abstract class AMQSession<C exten
// Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
}
- // Ensure we only try and close an open session.
- if (!_closed.getAndSet(true))
+ // We must close down all producers and consumers in an orderly fashion. This is the only method
+ // that can be called from a different thread of control from the one controlling the session.
+ synchronized (getFailoverMutex())
{
- _closing.set(true);
- synchronized (getFailoverMutex())
- {
- // We must close down all producers and consumers in an orderly fashion. This is the only method
- // that can be called from a different thread of control from the one controlling the session.
- synchronized (_messageDeliveryLock)
- {
+ // Ensure we only try and close an open session.
+ if (!_closed.getAndSet(true))
+ {
+ _closing.set(true);
+ synchronized (_messageDeliveryLock)
+ {
// we pass null since this is not an error case
closeProducersAndConsumers(null);
@@ -1366,21 +1362,11 @@ public abstract class AMQSession<C exten
public StreamMessage createStreamMessage() throws JMSException
{
- // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
- // calls through connection.closeAllSessions which is also called by the public connection.close()
- // with a null cause
- // When we are closing the Session due to a protocol session error we simply create a new AMQException
- // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
- // We need to determin here if the connection should be
-
- synchronized (getFailoverMutex())
- {
- checkNotClosed();
+ checkNotClosed();
- JMSStreamMessage msg = new JMSStreamMessage(getMessageDelegateFactory());
- msg.setAMQSession(this);
- return msg;
- }
+ JMSStreamMessage msg = new JMSStreamMessage(getMessageDelegateFactory());
+ msg.setAMQSession(this);
+ return msg;
}
/**
@@ -1454,14 +1440,11 @@ public abstract class AMQSession<C exten
public TextMessage createTextMessage() throws JMSException
{
- synchronized (getFailoverMutex())
- {
- checkNotClosed();
+ checkNotClosed();
- JMSTextMessage msg = new JMSTextMessage(getMessageDelegateFactory());
- msg.setAMQSession(this);
- return msg;
- }
+ JMSTextMessage msg = new JMSTextMessage(getMessageDelegateFactory());
+ msg.setAMQSession(this);
+ return msg;
}
protected Object getFailoverMutex()
@@ -2268,7 +2251,7 @@ public abstract class AMQSession<C exten
_dispatcher = new Dispatcher();
try
{
- _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
+ _dispatcherThread = Threading.getThreadFactory().newThread(_dispatcher);
}
catch(Exception e)
@@ -2918,7 +2901,7 @@ public abstract class AMQSession<C exten
{
ArrayList<C> consumers = new ArrayList<C>(_consumers.values());
_consumers.clear();
-
+ _logger.info(MessageFormat.format("Resubscribing consumers = {0} consumers.size={1}", consumers, consumers.size())); // FIXME: removeKey
for (C consumer : consumers)
{
consumer.failedOverPre();
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Oct 13 15:05:29 2010
@@ -1020,9 +1020,9 @@ public class AMQSession_0_10 extends AMQ
if (assertNode)
{
match = (result.getDurable() == node.isDurable()) &&
- (node.getExchangeType() != null &&
- node.getExchangeType().equals(result.getType())) &&
- (matchProps(result.getArguments(),node.getDeclareArgs()));
+ (node.getExchangeType() != null && node.getExchangeType().equals(result.getType())) &&
+ (node.getDeclareArgs() != null && result.hasArguments() &&
+ matchProps(result.getArguments(), node.getDeclareArgs()));
}
else if (node.getExchangeType() != null)
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org