You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC
svn commit: r1451244 [34/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java Thu Feb 28 16:14:30 2013
@@ -21,14 +21,75 @@
package org.apache.qpid.server.subscription;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
import java.util.List;
-public class QueueBrowserUsesNoAckTest extends InternalBrokerBaseCase
+public class QueueBrowserUsesNoAckTest extends QpidTestCase
{
+ private AMQChannel _channel;
+ private SimpleAMQQueue _queue;
+ private MessageStore _messageStore;
+ private String _queueName;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ BrokerTestHelper.setUp();
+ _channel = BrokerTestHelper.createChannel();
+ VirtualHost virtualHost = _channel.getVirtualHost();
+ _queueName = getTestName();
+ _queue = BrokerTestHelper.createQueue(_queueName, virtualHost);
+ _messageStore = virtualHost.getMessageStore();
+ Exchange defaultExchange = virtualHost.getExchangeRegistry().getDefaultExchange();
+ virtualHost.getBindingFactory().addBinding(_queueName, _queue, defaultExchange, null);
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ if (_channel != null)
+ {
+ _channel.getVirtualHost().close();
+ }
+ }
+ finally
+ {
+ BrokerTestHelper.tearDown();
+ super.tearDown();
+ }
+ }
+
+ private AMQChannel getChannel()
+ {
+ return _channel;
+ }
+
+ private InternalTestProtocolSession getSession()
+ {
+ return (InternalTestProtocolSession)_channel.getProtocolSession();
+ }
+
+ private SimpleAMQQueue getQueue()
+ {
+ return _queue;
+ }
public void testQueueBrowserUsesNoAck() throws AMQException
{
@@ -39,7 +100,7 @@ public class QueueBrowserUsesNoAckTest e
checkStoreContents(0);
//Send required messsages to the queue
- publishMessages(getSession(), getChannel(), sendMessageCount);
+ BrokerTestHelper.publishMessages(getChannel(), sendMessageCount, _queueName, ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString());
//Ensure they are stored
checkStoreContents(sendMessageCount);
@@ -74,4 +135,16 @@ public class QueueBrowserUsesNoAckTest e
.equals(Subscription.State.SUSPENDED));
}
+ private void checkStoreContents(int messageCount)
+ {
+ assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount());
+ }
+
+ private AMQShortString browse(AMQChannel channel, AMQQueue queue) throws AMQException
+ {
+ FieldTable filters = new FieldTable();
+ filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
+
+ return channel.subscribeToQueue(null, queue, true, filters, false, true);
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java Thu Feb 28 16:14:30 2013
@@ -23,20 +23,55 @@ package org.apache.qpid.server.subscript
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.logging.UnitTestMessageLogger;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.ProtocolEngine_0_10;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.transport.ServerSessionDelegate;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.TestNetworkConnection;
-public class SubscriptionFactoryImplTest extends InternalBrokerBaseCase
+public class SubscriptionFactoryImplTest extends QpidTestCase
{
+ private AMQChannel _channel;
+ private AMQProtocolSession _session;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ BrokerTestHelper.setUp();
+ _channel = BrokerTestHelper.createChannel();
+ _session = _channel.getProtocolSession();
+ GenericActor.setDefaultMessageLogger(new UnitTestMessageLogger(false));
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ if (_channel != null)
+ {
+ _channel.getVirtualHost().close();
+ }
+ }
+ finally
+ {
+ BrokerTestHelper.tearDown();
+ super.tearDown();
+ }
+ }
+
/**
* Tests that while creating Subscriptions of various types, the
* ID numbers assigned are allocated from a common sequence
@@ -46,35 +81,34 @@ public class SubscriptionFactoryImplTest
{
//create a No-Ack subscription, get the first Subscription ID
long previousId = 0;
- Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), false, null, false, getChannel().getCreditManager());
+ Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), false, null, false, _channel.getCreditManager());
previousId = noAckSub.getSubscriptionID();
//create an ack subscription, verify the next Subscription ID is used
- Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), true, null, false, getChannel().getCreditManager());
+ Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager());
assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID());
previousId = ackSub.getSubscriptionID();
//create a browser subscription
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- Subscription browerSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), true, null, false, getChannel().getCreditManager());
+ Subscription browerSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager());
assertEquals("Unexpected Subscription ID allocated", previousId + 1, browerSub.getSubscriptionID());
previousId = browerSub.getSubscriptionID();
//create an BasicGet NoAck subscription
- Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(getChannel(), getSession(), new AMQShortString("1"), null, false,
- getChannel().getCreditManager(),getChannel().getClientDeliveryMethod(), getChannel().getRecordDeliveryMethod());
+ Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(_channel, _session, new AMQShortString("1"), null, false,
+ _channel.getCreditManager(),_channel.getClientDeliveryMethod(), _channel.getRecordDeliveryMethod());
assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID());
previousId = getNoAckSub.getSubscriptionID();
//create a 0-10 subscription
ServerConnection conn = new ServerConnection(1);
- ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection(), getRegistry());
- conn.setVirtualHost(getVirtualHost());
- conn.setConnectionConfig(engine);
+ ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection());
+ conn.setVirtualHost(_session.getVirtualHost());
ServerSessionDelegate sesDel = new ServerSessionDelegate();
Binary name = new Binary(new byte[]{new Byte("1")});
- ServerSession session = new ServerSession(conn, sesDel, name, 0, engine);
+ ServerSession session = new ServerSession(conn, sesDel, name, 0);
Subscription sub_0_10 = SubscriptionFactoryImpl.INSTANCE.createSubscription(session, "1", MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED, MessageFlowMode.WINDOW, new WindowCreditManager(), null, null);
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/transport/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/transport/ServerSessionTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/transport/ServerSessionTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/transport/ServerSessionTest.java Thu Feb 28 16:14:30 2013
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,19 +15,17 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
package org.apache.qpid.server.transport;
-import java.util.UUID;
-
-import org.apache.qpid.server.configuration.MockConnectionConfig;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.Binary;
-public class ServerSessionTest extends InternalBrokerBaseCase
+public class ServerSessionTest extends QpidTestCase
{
private VirtualHost _virtualHost;
@@ -37,31 +34,44 @@ public class ServerSessionTest extends I
public void setUp() throws Exception
{
super.setUp();
- _virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
+ BrokerTestHelper.setUp();
+ _virtualHost = BrokerTestHelper.createVirtualHost(getName());
+ GenericActor.setDefaultMessageLogger(CurrentActor.get().getRootMessageLogger());
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ if (_virtualHost != null)
+ {
+ _virtualHost.close();
+ }
+ }
+ finally
+ {
+ BrokerTestHelper.tearDown();
+ super.tearDown();
+ }
}
public void testCompareTo() throws Exception
{
ServerConnection connection = new ServerConnection(1);
- connection.setConnectionConfig(createConnectionConfig());
+ connection.setVirtualHost(_virtualHost);
ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
- new Binary(getName().getBytes()), 0 , connection.getConfig());
+ new Binary(getName().getBytes()), 0);
// create a session with the same name but on a different connection
ServerConnection connection2 = new ServerConnection(2);
- connection2.setConnectionConfig(createConnectionConfig());
+ connection2.setVirtualHost(_virtualHost);
ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
- new Binary(getName().getBytes()), 0 , connection2.getConfig());
+ new Binary(getName().getBytes()), 0);
assertFalse("Unexpected compare result", session1.compareTo(session2) == 0);
assertEquals("Unexpected compare result", 0, session1.compareTo(session1));
}
- private MockConnectionConfig createConnectionConfig()
- {
- return new MockConnectionConfig(UUID.randomUUID(), null, null,
- false, 1, _virtualHost, "address", Boolean.TRUE, Boolean.TRUE, Boolean.TRUE,
- "authid", "remoteProcessName", new Integer(1967), new Integer(1970), _virtualHost.getConfigStore(), Boolean.FALSE);
- }
}
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java Thu Feb 28 16:14:30 2013
@@ -82,7 +82,7 @@ public class AsyncAutoCommitTransactionT
AsyncAutoCommitTransaction asyncAutoCommitTransaction =
new AsyncAutoCommitTransaction(_messageStore, _futureRecorder);
- asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction, System.currentTimeMillis());
+ asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction);
verify(_storeTransaction).enqueueMessage(_queue, _message);
verify(_futureRecorder).recordFuture(_future, _postTransactionAction);
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Thu Feb 28 16:14:30 2013
@@ -137,7 +137,7 @@ public class AutoCommitTransactionTest e
_message = createTestMessage(false);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action, 0L);
+ _transaction.enqueue(_queues, _message, _action);
assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -157,7 +157,7 @@ public class AutoCommitTransactionTest e
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action, 0L);
+ _transaction.enqueue(_queues, _message, _action);
assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -175,7 +175,7 @@ public class AutoCommitTransactionTest e
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, true, false, true});
- _transaction.enqueue(_queues, _message, _action, 0L);
+ _transaction.enqueue(_queues, _message, _action);
assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState());
@@ -198,7 +198,7 @@ public class AutoCommitTransactionTest e
try
{
- _transaction.enqueue(_queues, _message, _action, 0L);
+ _transaction.enqueue(_queues, _message, _action);
fail("Exception not thrown");
}
catch (RuntimeException re)
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Thu Feb 28 16:14:30 2013
@@ -140,7 +140,7 @@ public class LocalTransactionTest extend
_message = createTestMessage(false);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action1, 0L);
+ _transaction.enqueue(_queues, _message, _action1);
assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -156,7 +156,7 @@ public class LocalTransactionTest extend
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action1, 0L);
+ _transaction.enqueue(_queues, _message, _action1);
assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -173,7 +173,7 @@ public class LocalTransactionTest extend
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, true, false, true});
- _transaction.enqueue(_queues, _message, _action1, 0L);
+ _transaction.enqueue(_queues, _message, _action1);
assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState());
@@ -196,7 +196,7 @@ public class LocalTransactionTest extend
try
{
- _transaction.enqueue(_queues, _message, _action1, 0L);
+ _transaction.enqueue(_queues, _message, _action1);
fail("Exception not thrown");
}
catch (RuntimeException re)
@@ -217,7 +217,7 @@ public class LocalTransactionTest extend
{
_message = createTestMessage(false);
_queue = createTestAMQQueue(false);
-
+
_transaction.dequeue(_queue, _message, _action1);
assertEquals("Dequeue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
@@ -465,7 +465,6 @@ public class LocalTransactionTest extend
*/
public void testRollbackWorkWithAdditionalPostAction() throws Exception
{
-
_message = createTestMessage(true);
_queue = createTestAMQQueue(true);
@@ -482,6 +481,122 @@ public class LocalTransactionTest extend
assertTrue("Rollback action2 must be fired", _action1.isRollbackActionFired());
}
+ public void testFirstEnqueueRecordsTransactionStartAndUpdateTime() throws Exception
+ {
+ assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+ assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+ _message = createTestMessage(true);
+ _queue = createTestAMQQueue(true);
+
+ long startTime = System.currentTimeMillis();
+ _transaction.enqueue(_queue, _message, _action1);
+
+ assertTrue("Transaction start time should have been recorded", _transaction.getTransactionStartTime() >= startTime);
+ assertEquals("Transaction update time should be the same as transaction start time", _transaction.getTransactionStartTime(), _transaction.getTransactionUpdateTime());
+ }
+
+ public void testSubsequentEnqueueAdvancesTransactionUpdateTimeOnly() throws Exception
+ {
+ assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+ assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+ _message = createTestMessage(true);
+ _queue = createTestAMQQueue(true);
+
+ _transaction.enqueue(_queue, _message, _action1);
+
+ final long transactionStartTimeAfterFirstEnqueue = _transaction.getTransactionStartTime();
+ final long transactionUpdateTimeAfterFirstEnqueue = _transaction.getTransactionUpdateTime();
+
+ Thread.sleep(1);
+ _transaction.enqueue(_queue, _message, _action2);
+
+ final long transactionStartTimeAfterSecondEnqueue = _transaction.getTransactionStartTime();
+ final long transactionUpdateTimeAfterSecondEnqueue = _transaction.getTransactionUpdateTime();
+
+ assertEquals("Transaction start time after second enqueue should be unchanged", transactionStartTimeAfterFirstEnqueue, transactionStartTimeAfterSecondEnqueue);
+ assertTrue("Transaction update time after second enqueue should be greater than first update time", transactionUpdateTimeAfterSecondEnqueue > transactionUpdateTimeAfterFirstEnqueue);
+ }
+
+ public void testFirstDequeueRecordsTransactionStartAndUpdateTime() throws Exception
+ {
+ assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+ assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+ _message = createTestMessage(true);
+ _queue = createTestAMQQueue(true);
+
+ long startTime = System.currentTimeMillis();
+ _transaction.dequeue(_queue, _message, _action1);
+
+ assertTrue("Transaction start time should have been recorded", _transaction.getTransactionStartTime() >= startTime);
+ assertEquals("Transaction update time should be the same as transaction start time", _transaction.getTransactionStartTime(), _transaction.getTransactionUpdateTime());
+ }
+
+ public void testMixedEnqueuesAndDequeuesAdvancesTransactionUpdateTimeOnly() throws Exception
+ {
+ assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+ assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+ _message = createTestMessage(true);
+ _queue = createTestAMQQueue(true);
+
+ _transaction.enqueue(_queue, _message, _action1);
+
+ final long transactionStartTimeAfterFirstEnqueue = _transaction.getTransactionStartTime();
+ final long transactionUpdateTimeAfterFirstEnqueue = _transaction.getTransactionUpdateTime();
+
+ Thread.sleep(1);
+ _transaction.dequeue(_queue, _message, _action2);
+
+ final long transactionStartTimeAfterFirstDequeue = _transaction.getTransactionStartTime();
+ final long transactionUpdateTimeAfterFirstDequeue = _transaction.getTransactionUpdateTime();
+
+ assertEquals("Transaction start time after first dequeue should be unchanged", transactionStartTimeAfterFirstEnqueue, transactionStartTimeAfterFirstDequeue);
+ assertTrue("Transaction update time after first dequeue should be greater than first update time", transactionUpdateTimeAfterFirstDequeue > transactionUpdateTimeAfterFirstEnqueue);
+ }
+
+ public void testCommitResetsTransactionStartAndUpdateTime() throws Exception
+ {
+ assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+ assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+ _message = createTestMessage(true);
+ _queue = createTestAMQQueue(true);
+
+ long startTime = System.currentTimeMillis();
+ _transaction.enqueue(_queue, _message, _action1);
+
+ assertTrue(_transaction.getTransactionStartTime() >= startTime);
+ assertTrue(_transaction.getTransactionUpdateTime() >= startTime);
+
+ _transaction.commit();
+
+ assertEquals("Transaction start time should be reset after commit", 0, _transaction.getTransactionStartTime());
+ assertEquals("Transaction update time should be reset after commit", 0, _transaction.getTransactionUpdateTime());
+ }
+
+ public void testRollbackResetsTransactionStartAndUpdateTime() throws Exception
+ {
+ assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+ assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+ _message = createTestMessage(true);
+ _queue = createTestAMQQueue(true);
+
+ long startTime = System.currentTimeMillis();
+ _transaction.enqueue(_queue, _message, _action1);
+
+ assertTrue(_transaction.getTransactionStartTime() >= startTime);
+ assertTrue(_transaction.getTransactionUpdateTime() >= startTime);
+
+ _transaction.rollback();
+
+ assertEquals("Transaction start time should be reset after rollback", 0, _transaction.getTransactionStartTime());
+ assertEquals("Transaction update time should be reset after rollback", 0, _transaction.getTransactionUpdateTime());
+ }
+
private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
{
Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Thu Feb 28 16:14:30 2013
@@ -22,7 +22,6 @@ package org.apache.qpid.server.txn;
import org.apache.commons.lang.NotImplementedException;
-import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -68,11 +67,6 @@ class MockServerMessage implements Serve
throw new NotImplementedException();
}
- public SessionConfig getSessionConfig()
- {
- throw new NotImplementedException();
- }
-
public String getRoutingKey()
{
throw new NotImplementedException();
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Thu Feb 28 16:14:30 2013
@@ -22,26 +22,18 @@ package org.apache.qpid.server.virtualho
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.server.binding.BindingFactory;
-import org.apache.qpid.server.configuration.BrokerConfig;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.VirtualHostConfig;
-import org.apache.qpid.server.configuration.VirtualHostConfigType;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.DtxRegistry;
-import java.util.Map;
import java.util.UUID;
public class MockVirtualHost implements VirtualHost
@@ -58,19 +50,8 @@ public class MockVirtualHost implements
}
- public void createBrokerConnection(String transport, String host, int port,
- String vhost, boolean durable, String authMechanism,
- String username, String password)
- {
-
- }
-
- public BrokerLink createBrokerConnection(final UUID id, final long createTime, final Map<String, String> arguments)
- {
- return null;
- }
-
- public IApplicationRegistry getApplicationRegistry()
+ @Override
+ public VirtualHostRegistry getVirtualHostRegistry()
{
return null;
}
@@ -85,16 +66,6 @@ public class MockVirtualHost implements
return null;
}
- public UUID getBrokerId()
- {
- return null;
- }
-
- public ConfigStore getConfigStore()
- {
- return null;
- }
-
public DtxRegistry getDtxRegistry()
{
return null;
@@ -160,12 +131,6 @@ public class MockVirtualHost implements
return null;
}
-
- public void removeBrokerConnection(BrokerLink brokerLink)
- {
-
- }
-
public LinkRegistry getLinkRegistry(String remoteContainerId)
{
return null;
@@ -186,25 +151,6 @@ public class MockVirtualHost implements
}
- public BrokerConfig getBroker()
- {
- return null;
- }
-
- public String getFederationTag()
- {
- return null;
- }
-
- public void setBroker(BrokerConfig brokerConfig)
- {
-
- }
-
- public VirtualHostConfigType getConfigType()
- {
- return null;
- }
public long getCreateTime()
{
@@ -216,17 +162,6 @@ public class MockVirtualHost implements
return null;
}
- @Override
- public UUID getQMFId()
- {
- return null;
- }
-
- public ConfiguredObject<VirtualHostConfigType, VirtualHostConfig> getParent()
- {
- return null;
- }
-
public boolean isDurable()
{
return false;
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java Thu Feb 28 16:14:30 2013
@@ -20,15 +20,21 @@
*/
package org.apache.qpid.server.virtualhost;
+import static org.mockito.Mockito.mock;
+
+import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
import java.io.BufferedWriter;
@@ -38,15 +44,31 @@ import java.io.IOException;
public class VirtualHostImplTest extends QpidTestCase
{
- private ServerConfiguration _configuration;
- private ApplicationRegistry _registry;
+ private VirtualHostRegistry _virtualHostRegistry;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ BrokerTestHelper.setUp();
+ }
@Override
public void tearDown() throws Exception
{
- super.tearDown();
+ try
+ {
+ if (_virtualHostRegistry != null)
+ {
+ _virtualHostRegistry.close();
+ }
+ }
+ finally
+ {
+ BrokerTestHelper.tearDown();
+ super.tearDown();
+ }
- ApplicationRegistry.remove();
}
/**
@@ -74,17 +96,23 @@ public class VirtualHostImplTest extends
*/
public void testSpecifyingCustomBindingForDefaultExchangeThrowsException() throws Exception
{
- File config = writeConfigFile(getName(), getName(), null, false, new String[]{"custom-binding"});
+ final String queueName = getName();
+ final String customBinding = "custom-binding";
+ File config = writeConfigFile(queueName, queueName, null, false, new String[]{customBinding});
try
{
- createVirtualHost(getName(), config);
+ createVirtualHost(queueName, config);
fail("virtualhost creation should have failed due to illegal configuration");
}
catch (RuntimeException e)
{
+ assertNotNull(e.getCause());
+
assertEquals(ConfigurationException.class, e.getCause().getClass());
- //expected
+
+ Throwable configException = e.getCause();
+ assertEquals("Illegal attempt to bind queue '" + queueName + "' to the default exchange with a key other than the queue name: " + customBinding, configException.getMessage());
}
}
@@ -96,6 +124,14 @@ public class VirtualHostImplTest extends
assertEquals(State.ACTIVE, vhost.getState());
}
+ public void testVirtualHostHavingStoreSetAsTypeBecomesActive() throws Exception
+ {
+ String virtualHostName = getName();
+ VirtualHost host = createVirtualHostUsingStoreType(virtualHostName);
+ assertNotNull(host);
+ assertEquals(State.ACTIVE, host.getState());
+ }
+
public void testVirtualHostBecomesStoppedOnClose() throws Exception
{
File config = writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]);
@@ -107,22 +143,39 @@ public class VirtualHostImplTest extends
assertEquals(0, vhost.getHouseKeepingActiveCount());
}
+ public void testVirtualHostHavingStoreSetAsTypeBecomesStoppedOnClose() throws Exception
+ {
+ String virtualHostName = getName();
+ VirtualHost host = createVirtualHostUsingStoreType(virtualHostName);
+ assertNotNull(host);
+ assertEquals(State.ACTIVE, host.getState());
+ host.close();
+ assertEquals(State.STOPPED, host.getState());
+ assertEquals(0, host.getHouseKeepingActiveCount());
+ }
+
/**
* Tests that specifying an unknown exchange to bind the queue to results in failure to create the vhost
*/
public void testSpecifyingUnknownExchangeThrowsException() throws Exception
{
- File config = writeConfigFile(getName(), getName(), "made-up-exchange", true, new String[0]);
+ final String queueName = getName();
+ final String exchangeName = "made-up-exchange";
+ File config = writeConfigFile(queueName, queueName, exchangeName, true, new String[0]);
try
{
- createVirtualHost(getName(), config);
+ createVirtualHost(queueName, config);
fail("virtualhost creation should have failed due to illegal configuration");
}
catch (RuntimeException e)
{
+ assertNotNull(e.getCause());
+
assertEquals(ConfigurationException.class, e.getCause().getClass());
- //expected
+
+ Throwable configException = e.getCause();
+ assertEquals("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName, configException.getMessage());
}
}
@@ -154,12 +207,14 @@ public class VirtualHostImplTest extends
private VirtualHost createVirtualHost(String vhostName, File config) throws Exception
{
- _configuration = new ServerConfiguration(new XMLConfiguration(config));
+ Broker broker = BrokerTestHelper.createBrokerMock();
+ _virtualHostRegistry = broker.getVirtualHostRegistry();
- _registry = new TestApplicationRegistry(_configuration);
- ApplicationRegistry.initialise(_registry);
+ VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, config, broker);
+ VirtualHost host = new VirtualHostImpl(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(null), configuration);
+ _virtualHostRegistry.registerVirtualHost(host);
- return _registry.getVirtualHostRegistry().getVirtualHost(vhostName);
+ return host;
}
/**
@@ -184,7 +239,6 @@ public class VirtualHostImplTest extends
BufferedWriter writer = new BufferedWriter(fstream);
//extra outer tag to please Commons Configuration
- writer.write("<configuration>");
writer.write("<virtualhosts>");
writer.write(" <default>" + vhostName + "</default>");
@@ -222,8 +276,6 @@ public class VirtualHostImplTest extends
writer.write(" </virtualhost>");
writer.write("</virtualhosts>");
- writer.write("</configuration>");
-
writer.flush();
writer.close();
}
@@ -234,4 +286,17 @@ public class VirtualHostImplTest extends
return tmpFile;
}
+
+ private VirtualHost createVirtualHostUsingStoreType(String virtualHostName) throws ConfigurationException, Exception
+ {
+ Broker broker = BrokerTestHelper.createBrokerMock();
+ _virtualHostRegistry = broker.getVirtualHostRegistry();
+
+ Configuration config = new PropertiesConfiguration();
+ config.setProperty("store.type", MemoryMessageStore.TYPE);
+ VirtualHostConfiguration configuration = new VirtualHostConfiguration(virtualHostName, config, broker);
+ VirtualHost host = new VirtualHostImpl(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(null), configuration);
+ _virtualHostRegistry.registerVirtualHost(host);
+ return host;
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm (original)
+++ qpid/branches/asyncstore/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm Thu Feb 28 16:14:30 2013
@@ -23,8 +23,8 @@ package ${package};
import static org.apache.qpid.server.logging.AbstractRootMessageLogger.DEFAULT_LOG_HIERARCHY_PREFIX;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import java.text.MessageFormat;
import java.util.Locale;
@@ -44,7 +44,7 @@ import java.util.ResourceBundle;
public class ${type.name}Messages
{
private static ResourceBundle _messages;
- private static Locale _currentLocale;
+ private static Locale _currentLocale = BrokerProperties.getLocale();
public static final String ${type.name.toUpperCase()}_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "${type.name.toLowerCase()}";
#foreach( $message in ${type.list} )
@@ -58,24 +58,9 @@ public class ${type.name}Messages
Logger.getLogger(${message.methodName.toUpperCase()}_LOG_HIERARCHY);
#end
- reload();
- }
-
- public static void reload()
- {
- if (ApplicationRegistry.isConfigured())
- {
- _currentLocale = ApplicationRegistry.getInstance().getConfiguration().getLocale();
- }
- else
- {
- _currentLocale = Locale.getDefault();
- }
-
_messages = ResourceBundle.getBundle("${resource}", _currentLocale);
}
-
##
## The list stored under key 'list' in the 'type' HashMap contains all the
## log messages that this class should contain. So for each entry in the list
Modified: qpid/branches/asyncstore/java/build.deps
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/build.deps?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/build.deps (original)
+++ qpid/branches/asyncstore/java/build.deps Thu Feb 28 16:14:30 2013
@@ -59,10 +59,6 @@ servlet-api=${geronimo-servlet}
dojo=lib/required/dojo-war-1.7.2.war
-felix-main=lib/required/org.apache.felix.main-2.0.5.jar
-
-felix.libs=${felix-main}
-
jackson-core=lib/required/jackson-core-asl-1.9.0.jar
jackson-mapper=lib/required/jackson-mapper-asl-1.9.0.jar
@@ -72,19 +68,20 @@ commons-configuration.libs = ${commons-b
common.libs=${slf4j-api}
client.libs=${geronimo-jms}
amqp-1-0-common.libs=
-amqp-1-0-client.libs=${commons-cli}
+amqp-1-0-client.libs=
+amqp-1-0-client-example.libs=${commons-cli}
amqp-1-0-client-jms.libs=${geronimo-jms}
tools.libs=${commons-configuration.libs} ${log4j}
broker.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \
- ${xalan} ${felix.libs} ${derby-db} ${commons-configuration.libs} \
+ ${xalan} ${derby-db} ${commons-configuration.libs} \
${jackson-core} ${jackson-mapper} ${jetty} ${jetty-continuation} ${jetty-security} ${jetty-http} ${jetty-io} ${jetty-servlet} ${jetty-util} ${servlet-api} ${jetty-websocket}
broker-plugins-management-http.libs=${jetty} ${jetty-continuation} ${jetty-security} ${jetty-http} ${jetty-io} ${jetty-servlet} ${jetty-util} ${servlet-api} ${jackson-core} ${jackson-mapper}
-broker-plugins.libs=${felix.libs} ${log4j} ${commons-configuration.libs}
+broker-plugins.libs=${log4j} ${commons-configuration.libs}
test.libs=${slf4j-log4j} ${log4j} ${junit} ${slf4j-api} ${mockito-all}
-perftests.libs=${geronimo-jms} ${slf4j-api} ${log4j} ${slf4j-log4j} ${commons-logging} ${commons-collections} ${commons-beanutils-core} ${commons-lang} ${gson-all}
+perftests.libs=${geronimo-jms} ${slf4j-api} ${log4j} ${slf4j-log4j} ${commons-logging} ${commons-collections} ${commons-beanutils-core} ${commons-lang} ${gson-all} ${derby-db}
management-common.libs=
@@ -93,11 +90,12 @@ broker.test.libs=${test.libs}
client.test.libs=${test.libs}
client-example.test.libs=${test.libs}
tools.test.libs=
-testkit.test.libs=${test.libs}
systests.libs=${test.libs}
perftests.test.libs=${test.libs}
-broker-plugins.test.libs=${test.libs}
+broker-plugins-access-control.test.libs=${test.libs}
+broker-plugins-management-http.test.libs=${test.libs}
+broker-plugins-management-jmx.test.libs=${test.libs}
management-common.test.libs=${test.libs}
@@ -117,10 +115,12 @@ bdbstore-jmx.test.libs=${test.libs}
jfreechart.jar=lib/jfree/jfreechart-1.0.13.jar
jcommon.jar=lib/jfree/jcommon-1.0.16.jar
csvjdbc.jar=lib/csvjdbc/csvjdbc-1.0.8.jar
-perftests-visualisation-jfc.libs=${jfreechart.jar} ${jcommon.jar} ${csvjdbc.jar}
+perftests-visualisation-jfc.libs=${jfreechart.jar} ${jcommon.jar} ${csvjdbc.jar} ${derby-db}
perftests-visualisation-jfc.test.libs=${test.libs}
# Libraries used only within the build
bnd=lib/required/bnd-0.0.384.jar
jython=lib/required/jython-standalone-2.5.2.jar
maven-ant-tasks=lib/required/maven-ant-tasks-2.1.1.jar
+velocity.jar=lib/required/velocity-1.4.jar
+velocity-dep.jar=lib/required/velocity-dep-1.4.jar
Modified: qpid/branches/asyncstore/java/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/build.xml?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/build.xml (original)
+++ qpid/branches/asyncstore/java/build.xml Thu Feb 28 16:14:30 2013
@@ -32,7 +32,7 @@
</condition>
<property name="modules.core" value="common management/common amqp-1-0-common broker client amqp-1-0-client amqp-1-0-client-jms tools"/>
- <property name="modules.examples" value="client/example management/example"/>
+ <property name="modules.examples" value="client/example management/example amqp-1-0-client/example amqp-1-0-client-jms/example"/>
<property name="modules.tests" value="systests perftests"/>
<property name="modules.plugin" value="${broker-plugins} ${client-plugins}"/>
<property name="modules.jca" value="jca"/>
@@ -77,6 +77,10 @@
<iterate target="release-mvn"/>
</target>
+ <target name="deploy-snapshot" description="deploy snapshot artifacts to nexus">
+ <iterate target="deploy-snapshot"/>
+ </target>
+
<target name="compile" description="compile sources">
<iterate target="compile"/>
</target>
Modified: qpid/branches/asyncstore/java/client/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/build.xml?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/build.xml (original)
+++ qpid/branches/asyncstore/java/client/build.xml Thu Feb 28 16:14:30 2013
@@ -21,7 +21,7 @@
<project name="AMQ Client" default="build">
<property name="module.depends" value="common"/>
- <property name="module.test.depends" value="common/test" />
+ <property name="module.test.depends" value="common/tests" />
<property name="module.genpom" value="true"/>
<property name="module.genpom.args" value="-Sgeronimo-jms_1.1_spec=provided"/>
Modified: qpid/branches/asyncstore/java/client/example/src/main/java/org/apache/qpid/example/Drain.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/example/src/main/java/org/apache/qpid/example/Drain.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/example/src/main/java/org/apache/qpid/example/Drain.java (original)
+++ qpid/branches/asyncstore/java/client/example/src/main/java/org/apache/qpid/example/Drain.java Thu Feb 28 16:14:30 2013
@@ -88,7 +88,7 @@ public class Drain extends OptionParser
}
}
}
-
+ consumer.close();
ssn.close();
con.close();
}
Modified: qpid/branches/asyncstore/java/client/example/src/main/java/org/apache/qpid/example/Spout.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/example/src/main/java/org/apache/qpid/example/Spout.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/example/src/main/java/org/apache/qpid/example/Spout.java (original)
+++ qpid/branches/asyncstore/java/client/example/src/main/java/org/apache/qpid/example/Spout.java Thu Feb 28 16:14:30 2013
@@ -100,6 +100,7 @@ public class Spout extends OptionParser
System.out.println(msg);
System.out.println("-------------------------------\n");
}
+ producer.close();
ssn.close();
con.close();
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/client.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/client.bnd?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/client.bnd (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/client.bnd Thu Feb 28 16:14:30 2013
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.19.0
+ver: 0.21.0
Bundle-SymbolicName: qpid-client
Bundle-Version: ${ver}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Thu Feb 28 16:14:30 2013
@@ -344,7 +344,14 @@ public class AMQBrokerDetails implements
optionsURL.append("='");
- optionsURL.append(_options.get(key));
+ if (OPTIONS_TRUST_STORE_PASSWORD.equals(key) || OPTIONS_KEY_STORE_PASSWORD.equals(key))
+ {
+ optionsURL.append("********");
+ }
+ else
+ {
+ optionsURL.append(_options.get(key));
+ }
optionsURL.append("'");
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Feb 28 16:14:30 2013
@@ -179,6 +179,13 @@ public class AMQConnection extends Close
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
+ // Indicates whether to use the old stream message format or the
+ // new amqp-0-10 list encoded format.
+ private boolean _useLegacyStreamMessageFormat;
+
+ // When sending to a Queue destination for the first time, check that the queue is bound
+ private final boolean _validateQueueOnSend;
+
//used to track the last failover time for
//Address resolution purposes
private volatile long _lastFailoverTime = 0;
@@ -294,6 +301,30 @@ public class AMQConnection extends Close
_useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT);
}
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT) != null)
+ {
+ _useLegacyStreamMessageFormat = Boolean.parseBoolean(
+ connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT));
+ }
+ else
+ {
+ // use the default value set for all connections
+ _useLegacyStreamMessageFormat = System.getProperty(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT) == null ?
+ true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT);
+ }
+
+ if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null)
+ {
+ _validateQueueOnSend = Boolean.parseBoolean(
+ connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND));
+ }
+ else
+ {
+ _validateQueueOnSend =
+ Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false"));
+ }
+
+
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
if (_logger.isDebugEnabled())
{
@@ -1080,7 +1111,7 @@ public class AMQConnection extends Close
return _started;
}
- protected final boolean isConnected()
+ public final boolean isConnected()
{
return _connected;
}
@@ -1425,7 +1456,7 @@ public class AMQConnection extends Close
{
return _delegate.getProtocolVersion();
}
-
+
public String getBrokerUUID()
{
if(getProtocolVersion().equals(ProtocolVersion.v0_10))
@@ -1498,6 +1529,11 @@ public class AMQConnection extends Close
return _useLegacyMapMessageFormat;
}
+ public boolean isUseLegacyStreamMessageFormat()
+ {
+ return _useLegacyStreamMessageFormat;
+ }
+
private void verifyClientID() throws AMQException
{
if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID))
@@ -1539,4 +1575,14 @@ public class AMQConnection extends Close
+ localAddress + " to " + remoteAddress);
}
}
+
+ void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _delegate.setHeartbeatListener(listener);
+ }
+
+ public boolean validateQueueOnSend()
+ {
+ return _validateQueueOnSend;
+ }
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Thu Feb 28 16:14:30 2013
@@ -78,4 +78,6 @@ public interface AMQConnectionDelegate
* @return true if the feature is supported by the server
*/
boolean isSupportedServerFeature(final String featureName);
+
+ void setHeartbeatListener(HeartbeatListener listener);
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Feb 28 16:14:30 2013
@@ -33,6 +33,7 @@ import org.apache.qpid.configuration.Cli
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.Session;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
@@ -214,7 +215,8 @@ public class AMQConnectionDelegate_0_10
+ "********");
}
- ConnectionSettings conSettings = retriveConnectionSettings(brokerDetail);
+ ConnectionSettings conSettings = retrieveConnectionSettings(brokerDetail);
+
_qpidConnection.setConnectionDelegate(new ClientConnectionDelegate(conSettings, _conn.getConnectionURL()));
_qpidConnection.connect(conSettings);
@@ -420,7 +422,13 @@ public class AMQConnectionDelegate_0_10
return featureSupported;
}
- private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail)
+ @Override
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ ((ClientConnectionDelegate)(_qpidConnection.getConnectionDelegate())).setHeartbeatListener(listener);
+ }
+
+ private ConnectionSettings retrieveConnectionSettings(BrokerDetails brokerDetail)
{
ConnectionSettings conSettings = brokerDetail.buildConnectionSettings();
@@ -442,6 +450,24 @@ public class AMQConnectionDelegate_0_10
conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail));
+ //Check connection-level ssl override setting
+ String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL);
+ if(connectionSslOption != null)
+ {
+ boolean connUseSsl = Boolean.parseBoolean(connectionSslOption);
+ boolean brokerlistUseSsl = conSettings.isUseSSL();
+
+ if( connUseSsl != brokerlistUseSsl)
+ {
+ conSettings.setUseSSL(connUseSsl);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Applied connection ssl option override, setting UseSsl to: " + connUseSsl );
+ }
+ }
+ }
+
return conSettings;
}
@@ -464,10 +490,14 @@ public class AMQConnectionDelegate_0_10
heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000;
_logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>");
}
- else
+ else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null)
{
heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
}
+ else
+ {
+ heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT);
+ }
return heartbeat;
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Feb 28 16:14:30 2013
@@ -40,6 +40,7 @@ import org.apache.qpid.framing.TxSelectB
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -90,42 +91,43 @@ public class AMQConnectionDelegate_8_0 i
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Connecting to broker:" + brokerDetail);
+ }
final Set<AMQState> openOrClosedStates =
EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
-
- StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
-
ConnectionSettings settings = brokerDetail.buildConnectionSettings();
settings.setProtocol(brokerDetail.getTransport());
- SSLContext sslContext = null;
- if (settings.isUseSSL())
+ //Check connection-level ssl override setting
+ String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL);
+ if(connectionSslOption != null)
{
- try
- {
- sslContext = SSLContextFactory.buildClientContext(
- settings.getTrustStorePath(),
- settings.getTrustStorePassword(),
- settings.getTrustStoreType(),
- settings.getTrustManagerFactoryAlgorithm(),
- settings.getKeyStorePath(),
- settings.getKeyStorePassword(),
- settings.getKeyStoreType(),
- settings.getKeyManagerFactoryAlgorithm(),
- settings.getCertAlias());
- }
- catch (GeneralSecurityException e)
+ boolean connUseSsl = Boolean.parseBoolean(connectionSslOption);
+ boolean brokerlistUseSsl = settings.isUseSSL();
+
+ if( connUseSsl != brokerlistUseSsl)
{
- throw new AMQException("Unable to create SSLContext: " + e.getMessage(), e);
+ settings.setUseSSL(connUseSsl);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Applied connection ssl option override, setting UseSsl to: " + connUseSsl );
+ }
}
}
SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings);
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
- NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), sslContext);
+
+ NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()),
+ _conn.getProtocolHandler());
_conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender()));
+
+ StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
_conn.getProtocolHandler().getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
@@ -376,4 +378,10 @@ public class AMQConnectionDelegate_8_0 i
// we just hardcode JMS selectors as supported.
return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
}
+
+ @Override
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _conn.getProtocolHandler().setHeartbeatListener(listener);
+ }
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java Thu Feb 28 16:14:30 2013
@@ -140,7 +140,7 @@ public class AMQConnectionFactory implem
{
try
{
- ConnectionURL connectionDetails = new AMQConnectionURL(_connectionDetails.toString());
+ ConnectionURL connectionDetails = new AMQConnectionURL(_connectionDetails.getURL());
connectionDetails.setUsername(userName);
connectionDetails.setPassword(password);
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Feb 28 16:14:30 2013
@@ -52,6 +52,12 @@ public abstract class AMQDestination imp
private AMQShortString _exchangeClass;
+ private boolean _exchangeAutoDelete;
+
+ private boolean _exchangeDurable;
+
+ private boolean _exchangeInternal;
+
private boolean _isDurable;
private boolean _isExclusive;
@@ -106,16 +112,6 @@ public abstract class AMQDestination imp
_name = name;
}
- protected Link getTargetLink()
- {
- return _targetLink;
- }
-
- protected void setTargetLink(Link targetLink)
- {
- _targetLink = targetLink;
- }
-
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -180,10 +176,9 @@ public abstract class AMQDestination imp
private AddressOption _assert = AddressOption.NEVER;
private AddressOption _delete = AddressOption.NEVER;
- private Node _targetNode;
- private Node _sourceNode;
- private Link _targetLink;
+ private Node _node;
private Link _link;
+
// ----- / Fields required to support new address syntax -------
@@ -280,6 +275,9 @@ public abstract class AMQDestination imp
{
_exchangeName = binding.getExchangeName();
_exchangeClass = binding.getExchangeClass();
+ _exchangeDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCHANGE_DURABLE));
+ _exchangeAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCHANGE_AUTODELETE));
+ _exchangeInternal = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCHANGE_INTERNAL));
_isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
_isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
@@ -358,6 +356,10 @@ public abstract class AMQDestination imp
_destSyntax = DestSyntax.BURL;
_browseOnly = browseOnly;
_rejectBehaviour = null;
+ _exchangeAutoDelete = false;
+ _exchangeDurable = false;
+ _exchangeInternal = false;
+
if (_logger.isDebugEnabled())
{
_logger.debug("Based on " + toString() + " the selected destination syntax is " + _destSyntax);
@@ -412,6 +414,21 @@ public abstract class AMQDestination imp
return _exchangeClass;
}
+ public boolean isExchangeDurable()
+ {
+ return _exchangeDurable;
+ }
+
+ public boolean isExchangeAutoDelete()
+ {
+ return _exchangeAutoDelete;
+ }
+
+ public boolean isExchangeInternal()
+ {
+ return _exchangeInternal;
+ }
+
public boolean isTopic()
{
return ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(_exchangeClass);
@@ -579,6 +596,27 @@ public abstract class AMQDestination imp
sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
}
+ if (_exchangeDurable)
+ {
+ sb.append(BindingURL.OPTION_EXCHANGE_DURABLE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_exchangeAutoDelete)
+ {
+ sb.append(BindingURL.OPTION_EXCHANGE_AUTODELETE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
+ if (_exchangeInternal)
+ {
+ sb.append(BindingURL.OPTION_EXCHANGE_INTERNAL);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+
//removeKey the last char '?' if there is no options , ',' if there are.
sb.deleteCharAt(sb.length() - 1);
url = sb.toString();
@@ -773,24 +811,14 @@ public abstract class AMQDestination imp
_delete = option;
}
- public Node getTargetNode()
+ public Node getNode()
{
- return _targetNode;
+ return _node;
}
- public void setTargetNode(Node node)
+ public void setNode(Node node)
{
- _targetNode = node;
- }
-
- public Node getSourceNode()
- {
- return _sourceNode;
- }
-
- public void setSourceNode(Node node)
- {
- _sourceNode = node;
+ _node = node;
}
public Link getLink()
@@ -851,21 +879,11 @@ public abstract class AMQDestination imp
_browseOnly = _addrHelper.isBrowseOnly();
- _addressType = _addrHelper.getTargetNodeType();
- _targetNode = _addrHelper.getTargetNode(_addressType);
- _sourceNode = _addrHelper.getSourceNode(_addressType);
+ _addressType = _addrHelper.getNodeType();
+ _node = _addrHelper.getNode();
_link = _addrHelper.getLink();
}
- // This method is needed if we didn't know the node type at the beginning.
- // Therefore we have to query the broker to figure out the type.
- // Once the type is known we look for the necessary properties.
- public void rebuildTargetAndSourceNodes(int addressType)
- {
- _targetNode = _addrHelper.getTargetNode(addressType);
- _sourceNode = _addrHelper.getSourceNode(addressType);
- }
-
// ----- / new address syntax -----------
public boolean isBrowseOnly()
@@ -900,8 +918,7 @@ public abstract class AMQDestination imp
dest.setDelete(_delete);
dest.setBrowseOnly(_browseOnly);
dest.setAddressType(_addressType);
- dest.setTargetNode(_targetNode);
- dest.setSourceNode(_sourceNode);
+ dest.setNode(_node);
dest.setLink(_link);
dest.setAddressResolved(_addressResolved.get());
return dest;
@@ -935,6 +952,4 @@ public abstract class AMQDestination imp
{
return _rejectBehaviour;
}
-
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org