You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC
svn commit: r1368910 [20/27] - in /qpid/branches/asyncstore: ./ bin/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/
cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/
cpp/bindings/qpid/ruby/features/step_definitions/ cpp...
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java Fri Aug 3 12:13:32 2012
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.qpid.server.store;
import java.util.ArrayList;
@@ -12,22 +28,6 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
public class OperationalLoggingListenerTest extends TestCase
{
@@ -73,11 +73,11 @@ public class OperationalLoggingListenerT
}
- messageStore.attainState(State.CONFIGURING);
+ messageStore.attainState(State.INITIALISING);
assertEquals("Unexpected number of operational log messages on configuring", 1, messages.size());
assertEquals(messages.remove(0).toString(), ConfigStoreMessages.CREATED().toString());
- messageStore.attainState(State.CONFIGURED);
+ messageStore.attainState(State.INITIALISED);
assertEquals("Unexpected number of operational log messages on CONFIGURED", setStoreLocation ? 3 : 2, messages.size());
assertEquals(messages.remove(0).toString(), MessageStoreMessages.CREATED().toString());
assertEquals(messages.remove(0).toString(), TransactionLogMessages.CREATED().toString());
@@ -86,7 +86,7 @@ public class OperationalLoggingListenerT
assertEquals(messages.remove(0).toString(), MessageStoreMessages.STORE_LOCATION(STORE_LOCATION).toString());
}
- messageStore.attainState(State.RECOVERING);
+ messageStore.attainState(State.ACTIVATING);
assertEquals("Unexpected number of operational log messages on RECOVERING", 1, messages.size());
assertEquals(messages.remove(0).toString(), MessageStoreMessages.RECOVERY_START().toString());
@@ -147,6 +147,12 @@ public class OperationalLoggingListenerT
{
_eventManager.addEventListener(eventListener, events);
}
+
+ @Override
+ public String getStoreType()
+ {
+ return "TEST";
+ }
}
private static class TestActor implements LogActor
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java Fri Aug 3 12:13:32 2012
@@ -45,8 +45,8 @@ public class StateManagerTest extends Te
{
assertEquals(State.INITIAL, _manager.getState());
- _manager.stateTransition(State.INITIAL, State.CONFIGURING);
- assertEquals(State.CONFIGURING, _manager.getState());
+ _manager.attainState(State.INITIALISING);
+ assertEquals(State.INITIALISING, _manager.getState());
}
public void testStateTransitionDisallowed()
@@ -55,7 +55,7 @@ public class StateManagerTest extends Te
try
{
- _manager.stateTransition(State.ACTIVE, State.CLOSING);
+ _manager.attainState(State.CLOSING);
fail("Exception not thrown");
}
catch (IllegalStateException e)
@@ -98,22 +98,29 @@ public class StateManagerTest extends Te
public void testValidStateTransitions()
{
assertEquals(State.INITIAL, _manager.getState());
- performValidTransition(StateManager.CONFIGURE);
- performValidTransition(StateManager.CONFIGURE_COMPLETE);
- performValidTransition(StateManager.RECOVER);
+ performValidTransition(StateManager.INITIALISE);
+ performValidTransition(StateManager.INITALISE_COMPLETE);
performValidTransition(StateManager.ACTIVATE);
+ performValidTransition(StateManager.ACTIVATE_COMPLETE);
performValidTransition(StateManager.QUIESCE);
performValidTransition(StateManager.QUIESCE_COMPLETE);
performValidTransition(StateManager.RESTART);
- performValidTransition(StateManager.ACTIVATE);
+ performValidTransition(StateManager.ACTIVATE_COMPLETE);
performValidTransition(StateManager.CLOSE_ACTIVE);
performValidTransition(StateManager.CLOSE_COMPLETE);
+
+ _manager = new StateManager(this);
+ assertEquals(State.INITIAL, _manager.getState());
+ performValidTransition(StateManager.INITIALISE);
+ performValidTransition(StateManager.INITALISE_COMPLETE);
+ performValidTransition(StateManager.CLOSE_INITIALISED);
+ performValidTransition(StateManager.CLOSE_COMPLETE);
_manager = new StateManager(this);
- performValidTransition(StateManager.CONFIGURE);
- performValidTransition(StateManager.CONFIGURE_COMPLETE);
- performValidTransition(StateManager.RECOVER);
+ performValidTransition(StateManager.INITIALISE);
+ performValidTransition(StateManager.INITALISE_COMPLETE);
performValidTransition(StateManager.ACTIVATE);
+ performValidTransition(StateManager.ACTIVATE_COMPLETE);
performValidTransition(StateManager.QUIESCE);
performValidTransition(StateManager.QUIESCE_COMPLETE);
performValidTransition(StateManager.CLOSE_QUIESCED);
@@ -132,54 +139,50 @@ public class StateManagerTest extends Te
{
assertEquals(State.INITIAL, _manager.getState());
-
- performInvalidTransitions(StateManager.CONFIGURE, State.CONFIGURED);
- performInvalidTransitions(StateManager.CONFIGURE_COMPLETE, State.RECOVERING);
- performInvalidTransitions(StateManager.RECOVER, State.ACTIVE);
- performInvalidTransitions(StateManager.ACTIVATE, State.QUIESCING, State.CLOSING);
+ performInvalidTransitions(StateManager.INITIALISE, State.INITIALISED);
+ performInvalidTransitions(StateManager.INITALISE_COMPLETE, State.ACTIVATING, State.CLOSING);
+ performInvalidTransitions(StateManager.ACTIVATE, State.ACTIVE);
+ performInvalidTransitions(StateManager.ACTIVATE_COMPLETE, State.QUIESCING, State.CLOSING, State.INITIALISED);
performInvalidTransitions(StateManager.QUIESCE, State.QUIESCED);
- performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.RECOVERING, State.CLOSING);
+ performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.ACTIVATING, State.CLOSING);
performInvalidTransitions(StateManager.CLOSE_QUIESCED, State.CLOSED);
performInvalidTransitions(StateManager.CLOSE_COMPLETE);
-
-
-
}
- private void performInvalidTransitions(StateManager.Transition preTransition, State... validTransitions)
+ private void performInvalidTransitions(StateManager.Transition preTransition, State... validEndStates)
{
if(preTransition != null)
{
performValidTransition(preTransition);
}
- EnumSet<State> nextStates = EnumSet.allOf(State.class);
+ EnumSet<State> endStates = EnumSet.allOf(State.class);
- if(validTransitions != null)
+ if(validEndStates != null)
{
- for(State state: validTransitions)
+ for(State state: validEndStates)
{
- nextStates.remove(state);
+ endStates.remove(state);
}
}
- for(State nextState : nextStates)
+ for(State invalidEndState : endStates)
{
- performInvalidStateTransition(nextState);
+ performInvalidStateTransition(invalidEndState);
}
}
- private void performInvalidStateTransition(State state)
+ private void performInvalidStateTransition(State invalidEndState)
{
try
{
_event = null;
State startState = _manager.getState();
- _manager.attainState(state);
- fail("Invalid state transition performed: " + startState + " to " + state);
+ _manager.attainState(invalidEndState);
+ fail("Invalid state transition performed: " + startState + " to " + invalidEndState);
}
catch(IllegalStateException e)
{
@@ -188,6 +191,7 @@ public class StateManagerTest extends Te
assertNull("No event should have be fired", _event);
}
+ @Override
public void event(Event event)
{
_event = event;
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Fri Aug 3 12:13:32 2012
@@ -1,5 +1,3 @@
-package org.apache.qpid.server.subscription;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,16 +19,24 @@ package org.apache.qpid.server.subscript
*
*/
+package org.apache.qpid.server.subscription;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
+import org.apache.qpid.server.stats.StatisticsCounter;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -42,7 +48,7 @@ public class MockSubscription implements
private AMQShortString tag = new AMQShortString("mocktag");
private AMQQueue queue = null;
private StateListener _listener = null;
- private AMQQueue.Context _queueContext = null;
+ private volatile AMQQueue.Context _queueContext = null;
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
@@ -76,19 +82,9 @@ public class MockSubscription implements
_state = State.CLOSED;
}
- public boolean filtersMessages()
- {
- return false;
- }
-
- public AMQChannel getChannel()
- {
- return null;
- }
-
- public AMQShortString getConsumerTag()
+ public String getConsumerName()
{
- return tag;
+ return tag == null ? null : tag.asString();
}
public long getSubscriptionID()
@@ -121,11 +117,36 @@ public class MockSubscription implements
return false;
}
+ public long getBytesOut()
+ {
+ return 0; // TODO - Implement
+ }
+
+ public long getMessagesOut()
+ {
+ return 0; // TODO - Implement
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return 0; // TODO - Implement
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return 0; // TODO - Implement
+ }
+
public AMQQueue getQueue()
{
return queue;
}
+ public AMQSessionModel getSessionModel()
+ {
+ return new MockSessionModel();
+ }
+
public boolean trySendLock()
{
return _stateChangeLock.tryLock();
@@ -154,11 +175,6 @@ public class MockSubscription implements
return _isActive ;
}
- public void confirmAutoClose()
- {
-
- }
-
public void set(String key, Object value)
{
}
@@ -173,11 +189,6 @@ public class MockSubscription implements
return false;
}
- public boolean isBrowser()
- {
- return false;
- }
-
public boolean isClosed()
{
return _closed;
@@ -207,10 +218,6 @@ public class MockSubscription implements
_stateChangeLock.unlock();
}
- public void resend(QueueEntry entry) throws AMQException
- {
- }
-
public void onDequeue(QueueEntry queueEntry)
{
}
@@ -232,7 +239,6 @@ public class MockSubscription implements
messages.add(entry);
}
- @Override
public void flushBatched()
{
@@ -249,7 +255,7 @@ public class MockSubscription implements
}
public void setNoLocal(boolean noLocal)
- {
+ {
}
public void setStateListener(StateListener listener)
@@ -285,4 +291,259 @@ public class MockSubscription implements
{
_isActive = isActive;
}
+
+ private static class MockSessionModel implements AMQSessionModel
+ {
+
+ @Override
+ public int compareTo(AMQSessionModel o)
+ {
+ return 0;
+ }
+
+ @Override
+ public UUID getQMFId()
+ {
+ return null;
+ }
+
+ @Override
+ public AMQConnectionModel getConnectionModel()
+ {
+ return new MockConnectionModel();
+ }
+
+ @Override
+ public String getClientID()
+ {
+ return null;
+ }
+
+ @Override
+ public void close() throws AMQException
+ {
+ }
+
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return null;
+ }
+
+ @Override
+ public void checkTransactionStatus(long openWarn, long openClose,
+ long idleWarn, long idleClose) throws AMQException
+ {
+ }
+
+ @Override
+ public void block(AMQQueue queue)
+ {
+ }
+
+ @Override
+ public void unblock(AMQQueue queue)
+ {
+ }
+
+ @Override
+ public void block()
+ {
+ }
+
+ @Override
+ public void unblock()
+ {
+ }
+
+ @Override
+ public boolean getBlocking()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean onSameConnection(InboundMessage inbound)
+ {
+ return false;
+ }
+
+ @Override
+ public int getUnacknowledgedMessageCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public Long getTxnCount()
+ {
+ return null;
+ }
+
+ @Override
+ public Long getTxnStart()
+ {
+ return null;
+ }
+
+ @Override
+ public Long getTxnCommits()
+ {
+ return null;
+ }
+
+ @Override
+ public Long getTxnRejects()
+ {
+ return null;
+ }
+
+ @Override
+ public int getChannelId()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getConsumerCount()
+ {
+ return 0;
+ }
+ }
+
+ private static class MockConnectionModel implements AMQConnectionModel
+ {
+ @Override
+ public void initialiseStatistics()
+ {
+ }
+
+ @Override
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ }
+
+ @Override
+ public void registerMessageDelivered(long messageSize)
+ {
+ }
+
+ @Override
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return null;
+ }
+
+ @Override
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return null;
+ }
+
+ @Override
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return null;
+ }
+
+ @Override
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return null;
+ }
+
+ @Override
+ public void resetStatistics()
+ {
+
+ }
+
+ @Override
+ public void close(AMQConstant cause, String message)
+ throws AMQException
+ {
+ }
+
+ @Override
+ public void closeSession(AMQSessionModel session, AMQConstant cause,
+ String message) throws AMQException
+ {
+ }
+
+ @Override
+ public long getConnectionId()
+ {
+ return 0;
+ }
+
+ @Override
+ public List<AMQSessionModel> getSessionModels()
+ {
+ return null;
+ }
+
+ @Override
+ public void block()
+ {
+ }
+
+ @Override
+ public void unblock()
+ {
+ }
+
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return null;
+ }
+
+ @Override
+ public String getUserName()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isSessionNameUnique(byte[] name)
+ {
+ return false;
+ }
+
+ @Override
+ public String getRemoteAddressString()
+ {
+ return "remoteAddress:1234";
+ }
+
+ @Override
+ public String getClientId()
+ {
+ return null;
+ }
+
+ @Override
+ public String getClientVersion()
+ {
+ return null;
+ }
+
+ @Override
+ public String getPrincipalAsString()
+ {
+ return null;
+ }
+
+ @Override
+ public long getSessionCountLimit()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getLastIoTime()
+ {
+ return 0;
+ }
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java Fri Aug 3 12:13:32 2012
@@ -124,6 +124,12 @@ class MockStoreTransaction implements Tr
storeTransaction.setState(TransactionState.STARTED);
return storeTransaction;
}
+
+ @Override
+ public String getStoreType()
+ {
+ return "TEST";
+ }
};
}
}
\ No newline at end of file
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Fri Aug 3 12:13:32 2012
@@ -20,8 +20,7 @@
*/
package org.apache.qpid.server.util;
-import java.util.UUID;
-
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.AMQException;
@@ -38,6 +37,7 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -45,7 +45,6 @@ import org.apache.qpid.server.registry.A
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -68,10 +67,10 @@ public class InternalBrokerBaseCase exte
super.setUp();
_configXml.addProperty("virtualhosts.virtualhost.name", "test");
- _configXml.addProperty("virtualhosts.virtualhost.test.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
+ _configXml.addProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
_configXml.addProperty("virtualhosts.virtualhost(-1).name", getName());
- _configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
+ _configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
createBroker();
}
@@ -85,7 +84,7 @@ public class InternalBrokerBaseCase exte
configure();
- _registry = new TestApplicationRegistry(_configuration);
+ _registry = createApplicationRegistry();
ApplicationRegistry.initialise(_registry);
_registry.getVirtualHostRegistry().setDefaultVirtualHostName(getName());
_virtualHost = _registry.getVirtualHostRegistry().getVirtualHost(getName());
@@ -93,7 +92,7 @@ public class InternalBrokerBaseCase exte
QUEUE_NAME = new AMQShortString("test");
// Create a queue on the test Vhost.. this will aid in diagnosing duff tests
// as the ExpiredMessage Task will log with the test Name.
- _queue = AMQQueueFactory.createAMQQueueImpl(QUEUE_NAME, false, new AMQShortString("testowner"),
+ _queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), QUEUE_NAME.asString(), false, "testowner",
false, false, _virtualHost, null);
Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
@@ -102,7 +101,7 @@ public class InternalBrokerBaseCase exte
_virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
_messageStore = _virtualHost.getMessageStore();
- _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()), false, new AMQShortString("testowner"),
+ _queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), getName(), false, "testowner",
false, false, _virtualHost, null);
_virtualHost.getQueueRegistry().registerQueue(_queue);
@@ -119,6 +118,11 @@ public class InternalBrokerBaseCase exte
_session.addChannel(_channel);
}
+ protected IApplicationRegistry createApplicationRegistry() throws ConfigurationException
+ {
+ return new TestApplicationRegistry(_configuration);
+ }
+
protected void configure()
{
// Allow other tests to override configuration
@@ -250,7 +254,7 @@ public class InternalBrokerBaseCase exte
channel.publishContentHeader(_headerBody);
}
-
+ channel.sync();
}
public void acknowledge(AMQChannel channel, long deliveryTag)
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java Fri Aug 3 12:13:32 2012
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.util;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.Map;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.configuration.ServerConfiguration;
@@ -28,9 +31,11 @@ import org.apache.qpid.server.logging.Nu
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabase;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.IAuthenticationManagerRegistry;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import java.util.Properties;
@@ -51,11 +56,10 @@ public class TestApplicationRegistry ext
super.initialise();
}
- /**
- * @see org.apache.qpid.server.registry.ApplicationRegistry#createAuthenticationManager()
- */
@Override
- protected AuthenticationManager createAuthenticationManager() throws ConfigurationException
+ protected IAuthenticationManagerRegistry createAuthenticationManagerRegistry(
+ ServerConfiguration _configuration, PluginManager _pluginManager)
+ throws ConfigurationException
{
final Properties users = new Properties();
users.put("guest","guest");
@@ -63,7 +67,7 @@ public class TestApplicationRegistry ext
final PropertiesPrincipalDatabase ppd = new PropertiesPrincipalDatabase(users);
- AuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager()
+ final AuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager()
{
/**
@@ -83,12 +87,35 @@ public class TestApplicationRegistry ext
super.initialise();
}
};
-
pdam.initialise();
- return pdam;
- }
+ return new IAuthenticationManagerRegistry()
+ {
+ @Override
+ public void close()
+ {
+ pdam.close();
+ }
+
+ @Override
+ public AuthenticationManager getAuthenticationManager(
+ SocketAddress address)
+ {
+ return pdam;
+ }
+ @Override
+ public Map<String, AuthenticationManager> getAvailableAuthenticationManagers()
+ {
+ return Collections.singletonMap(pdam.getClass().getName(), pdam);
+ }
+
+ @Override
+ public void addRegistryChangeListener(RegistryChangeListener listener)
+ {
+ }
+ };
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java Fri Aug 3 12:13:32 2012
@@ -64,4 +64,50 @@ public class HouseKeepingTaskTest extend
//clean up the test actor
CurrentActor.remove();
}
+
+ public void testThreadNameIsSetForDurationOfTask() throws Exception
+ {
+ //create and set a test actor
+ LogActor testActor = new TestLogActor(new NullRootMessageLogger());
+ CurrentActor.set(testActor);
+
+ String originalThreadName = Thread.currentThread().getName();
+
+ String vhostName = "HouseKeepingTaskTestVhost";
+
+ String expectedThreadNameDuringExecution = vhostName + ":" + "ThreadNameRememberingTask";
+
+ ThreadNameRememberingTask testTask = new ThreadNameRememberingTask(new MockVirtualHost(vhostName));
+
+ testTask.run();
+
+ assertEquals("Thread name should have been set during execution", expectedThreadNameDuringExecution, testTask.getThreadNameDuringExecution());
+ assertEquals("Thread name should have been reverted after task has run", originalThreadName, Thread.currentThread().getName());
+
+ //clean up the test actor
+ CurrentActor.remove();
+ }
+
+
+ private static final class ThreadNameRememberingTask extends HouseKeepingTask
+ {
+ private String _threadNameDuringExecution;
+
+ private ThreadNameRememberingTask(VirtualHost vhost)
+ {
+ super(vhost);
+ }
+
+ @Override
+ public void execute()
+ {
+ _threadNameDuringExecution = Thread.currentThread().getName(); // store current thread name so we can assert it later
+ throw new RuntimeException("deliberate exception to check that thread name still gets reverted");
+ }
+
+ public String getThreadNameDuringExecution()
+ {
+ return _threadNameDuringExecution;
+ }
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Fri Aug 3 12:13:32 2012
@@ -32,7 +32,6 @@ import org.apache.qpid.server.connection
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -141,11 +140,6 @@ public class MockVirtualHost implements
return 0;
}
- public ManagedObject getManagedObject()
- {
- return null;
- }
-
public MessageStore getMessageStore()
{
return null;
@@ -222,6 +216,12 @@ public class MockVirtualHost implements
return null;
}
+ @Override
+ public UUID getQMFId()
+ {
+ return null;
+ }
+
public ConfiguredObject<VirtualHostConfigType, VirtualHostConfig> getParent()
{
return null;
@@ -257,11 +257,6 @@ public class MockVirtualHost implements
}
- public boolean isStatisticsEnabled()
- {
- return false;
- }
-
public void registerMessageDelivered(long messageSize)
{
@@ -277,14 +272,16 @@ public class MockVirtualHost implements
}
- public void setStatisticsEnabled(boolean enabled)
+ public State getState()
{
+ return State.ACTIVE;
+ }
+ public void block()
+ {
}
- @Override
- public State getState()
+ public void unblock()
{
- return State.ACTIVE;
}
}
\ No newline at end of file
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java Fri Aug 3 12:13:32 2012
@@ -27,7 +27,7 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MemoryMessageStoreFactory;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -192,7 +192,7 @@ public class VirtualHostImplTest extends
writer.write(" <name>" + vhostName + "</name>");
writer.write(" <" + vhostName + ">");
writer.write(" <store>");
- writer.write(" <factoryclass>" + MemoryMessageStoreFactory.class.getName() + "</factoryclass>");
+ writer.write(" <class>" + MemoryMessageStore.class.getName() + "</class>");
writer.write(" </store>");
if(exchangeName != null && !dontDeclare)
{
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyTest.java Fri Aug 3 12:13:32 2012
@@ -146,7 +146,7 @@ public class TopicDeletePolicyTest exten
MockAMQQueue queue = createOwnedQueue();
- queue.addBinding(new Binding(null, "bindingKey", queue, new DirectExchange(), null));
+ queue.addBinding(new Binding(null, null, "bindingKey", queue, new DirectExchange(), null));
policy.performPolicy(queue);
@@ -165,7 +165,7 @@ public class TopicDeletePolicyTest exten
MockAMQQueue queue = createOwnedQueue();
- queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+ queue.addBinding(new Binding(null, null, "bindingKey", queue, new TopicExchange(), null));
queue.setAutoDelete(false);
@@ -186,7 +186,7 @@ public class TopicDeletePolicyTest exten
final MockAMQQueue queue = createOwnedQueue();
- queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+ queue.addBinding(new Binding(null, null, "bindingKey", queue, new TopicExchange(), null));
setQueueToAutoDelete(queue);
@@ -207,7 +207,7 @@ public class TopicDeletePolicyTest exten
MockAMQQueue queue = createOwnedQueue();
- queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+ queue.addBinding(new Binding(null, null, "bindingKey", queue, new TopicExchange(), null));
policy.performPolicy(queue);
@@ -233,7 +233,7 @@ public class TopicDeletePolicyTest exten
MockAMQQueue queue = createOwnedQueue();
- queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+ queue.addBinding(new Binding(null, null, "bindingKey", queue, new TopicExchange(), null));
policy.performPolicy(queue);
@@ -253,7 +253,7 @@ public class TopicDeletePolicyTest exten
MockAMQQueue queue = createOwnedQueue();
- queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+ queue.addBinding(new Binding(null, null, "bindingKey", queue, new TopicExchange(), null));
policy.performPolicy(queue);
Modified: qpid/branches/asyncstore/java/build.deps
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/build.deps?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/build.deps (original)
+++ qpid/branches/asyncstore/java/build.deps Fri Aug 3 12:13:32 2012
@@ -17,41 +17,54 @@
# under the License.
#
-commons-beanutils-core=lib/commons-beanutils-core-1.8.0.jar
-commons-cli=lib/commons-cli-1.0.jar
-commons-codec=lib/commons-codec-1.3.jar
-commons-collections=lib/commons-collections-3.2.jar
-commons-configuration=lib/commons-configuration-1.6.jar
-commons-digester=lib/commons-digester-1.8.1.jar
-commons-lang=lib/commons-lang-2.2.jar
-commons-logging=lib/commons-logging-1.0.4.jar
+commons-beanutils-core=lib/required/commons-beanutils-core-1.8.3.jar
+commons-cli=lib/required/commons-cli-1.2.jar
+commons-codec=lib/required/commons-codec-1.6.jar
+commons-collections=lib/required/commons-collections-3.2.1.jar
+commons-configuration=lib/required/commons-configuration-1.8.jar
+commons-digester=lib/required/commons-digester-1.8.1.jar
+commons-lang=lib/required/commons-lang-2.6.jar
+commons-logging=lib/required/commons-logging-1.1.1.jar
+
+derby-db=lib/required/derby-10.8.2.2.jar
+
+geronimo-jms=lib/required/geronimo-jms_1.1_spec-1.0.jar
+geronimo-j2ee=lib/required/geronimo-j2ee-connector_1.5_spec-2.0.0.jar
+geronimo-jta=lib/required/geronimo-jta_1.1_spec-1.1.1.jar
+geronimo-kernel=lib/required/geronimo-kernel-2.2.1.jar
+geronimo-openejb=lib/required/geronimo-ejb_3.0_spec-1.0.1.jar
+geronimo-servlet=lib/required/geronimo-servlet_2.5_spec-1.2.jar
+
+junit=lib/required/junit-3.8.1.jar
+mockito-all=lib/required/mockito-all-1.9.0.jar
+
+gson-all=lib/required/gson-2.0.jar
+
+log4j=lib/required/log4j-1.2.16.jar
+
+slf4j-api=lib/required/slf4j-api-1.6.4.jar
+slf4j-log4j=lib/required/slf4j-log4j12-1.6.4.jar
+
+xalan=lib/required/xalan-2.7.0.jar
+
+jetty=lib/required/jetty-server-7.6.3.v20120416.jar
+jetty-continuation=lib/required/jetty-continuation-7.6.3.v20120416.jar
+jetty-security=lib/required/jetty-security-7.6.3.v20120416.jar
+jetty-util=lib/required/jetty-util-7.6.3.v20120416.jar
+jetty-io=lib/required/jetty-io-7.6.3.v20120416.jar
+jetty-http=lib/required/jetty-http-7.6.3.v20120416.jar
+jetty-servlet=lib/required/jetty-servlet-7.6.3.v20120416.jar
+jetty-websocket=lib/required/jetty-websocket-7.6.3.v20120416.jar
+servlet-api=${geronimo-servlet}
-derby-db=lib/derby-10.6.1.0.jar
+dojo=lib/required/dojo-war-1.7.2.war
-geronimo-jms=lib/geronimo-jms_1.1_spec-1.0.jar
-geronimo-j2ee=lib/geronimo-j2ee-connector_1.5_spec-2.0.0.jar
-geronimo-jta=lib/geronimo-jta_1.1_spec-1.1.1.jar
-geronimo-kernel=lib/geronimo-kernel-2.2.1.jar
-geronimo-openejb=lib/geronimo-ejb_3.0_spec-1.0.1.jar
-
-junit=lib/junit-3.8.1.jar
-mockito-all=lib/mockito-all-1.9.0.jar
-
-gson-all=lib/gson-2.0.jar
-
-log4j=lib/log4j-1.2.12.jar
-
-slf4j-api=lib/slf4j-api-1.6.1.jar
-slf4j-log4j=lib/slf4j-log4j12-1.6.1.jar
-
-xalan=lib/xalan-2.7.0.jar
-
-felix-main=lib/org.apache.felix.main-2.0.5.jar
+felix-main=lib/required/org.apache.felix.main-2.0.5.jar
felix.libs=${felix-main}
-jackson-core=lib/jackson-core-asl-1.9.0.jar
-jackson-mapper=lib/jackson-mapper-asl-1.9.0.jar
+jackson-core=lib/required/jackson-core-asl-1.9.0.jar
+jackson-mapper=lib/required/jackson-mapper-asl-1.9.0.jar
commons-configuration.libs = ${commons-beanutils-core} ${commons-digester} \
${commons-codec} ${commons-lang} ${commons-collections} ${commons-configuration}
@@ -64,72 +77,17 @@ amqp-1-0-client-jms.libs=${geronimo-jms}
tools.libs=${commons-configuration.libs} ${log4j}
broker.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \
${xalan} ${felix.libs} ${derby-db} ${commons-configuration.libs} \
- ${jackson-core} ${jackson-mapper}
+ ${jackson-core} ${jackson-mapper} ${jetty} ${jetty-continuation} ${jetty-security} ${jetty-http} ${jetty-io} ${jetty-servlet} ${jetty-util} ${servlet-api} ${jetty-websocket}
+broker-plugins-management-http.libs=${jetty} ${jetty-continuation} ${jetty-security} ${jetty-http} ${jetty-io} ${jetty-servlet} ${jetty-util} ${servlet-api} ${jackson-core} ${jackson-mapper}
broker-plugins.libs=${felix.libs} ${log4j} ${commons-configuration.libs}
test.libs=${slf4j-log4j} ${log4j} ${junit} ${slf4j-api} ${mockito-all}
-perftests.libs=${geronimo-jms} ${slf4j-api} ${commons-collections} ${commons-beanutils-core} ${commons-lang} ${gson-all}
-
-ibm-icu=lib/com.ibm.icu_3.8.1.v20080530.jar
-ecl-core-jface=lib/org.eclipse.jface_3.4.1.M20080827-2000.jar
-ecl-core-jface-databinding=lib/org.eclipse.jface.databinding_1.2.1.M20080827-0800a.jar
-ecl-core-commands=lib/org.eclipse.core.commands_3.4.0.I20080509-2000.jar
-ecl-core-contenttype=lib/org.eclipse.core.contenttype_3.3.0.v20080604-1400.jar
-ecl-core-databinding=lib/org.eclipse.core.databinding_1.1.1.M20080827-0800b.jar
-ecl-core-expressions=lib/org.eclipse.core.expressions_3.4.0.v20080603-2000.jar
-ecl-core-jobs=lib/org.eclipse.core.jobs_3.4.0.v20080512.jar
-ecl-core-runtime=lib/org.eclipse.core.runtime_3.4.0.v20080512.jar
-ecl-core-runtime-compat-registry=lib/org.eclipse.core.runtime.compatibility.registry_3.2.200.v20080610/**
-ecl-equinox-app=lib/org.eclipse.equinox.app_1.1.0.v20080421-2006.jar
-ecl-equinox-common=lib/org.eclipse.equinox.common_3.4.0.v20080421-2006.jar
-ecl-equinox-launcher=lib/org.eclipse.equinox.launcher_1.0.101.R34x_v20080819.jar
-ecl-equinox-prefs=lib/org.eclipse.equinox.preferences_3.2.201.R34x_v20080709.jar
-ecl-equinox-registry=lib/org.eclipse.equinox.registry_3.4.0.v20080516-0950.jar
-ecl-help=lib/org.eclipse.help_3.3.101.v20080702_34x.jar
-ecl-osgi=lib/org.eclipse.osgi_3.4.2.R34x_v20080826-1230.jar
-ecl-swt=lib/org.eclipse.swt_3.4.1.v3449c.jar
-ecl-ui=lib/org.eclipse.ui_3.4.1.M20080910-0800.jar
-ecl-ui-forms=lib/org.eclipse.ui.forms_3.3.101.v20080708_34x.jar
-ecl-ui-workbench=lib/org.eclipse.ui.workbench_3.4.1.M20080827-0800a.jar
-apache-commons-codec=lib/org.apache.commons.codec_1.3.0.v20080530-1600.jar
-
-ecl-swt-win32-win32-x86=lib/org.eclipse.swt.win32.win32.x86_3.4.1.v3449c.jar
-ecl-equinox-launcher-win32-win32-x86=lib/org.eclipse.equinox.launcher.win32.win32.x86_1.0.101.R34x_v20080731/**
-ecl-swt-linux-gtk-x86=lib/org.eclipse.swt.gtk.linux.x86_3.4.1.v3449c.jar
-ecl-equinox-launcher-linux-gtk-x86=lib/org.eclipse.equinox.launcher.gtk.linux.x86_1.0.101.R34x_v20080805/**
-ecl-swt-linux-gtk-x86_64=lib/org.eclipse.swt.gtk.linux.x86_64_3.4.1.v3449c.jar
-ecl-equinox-launcher-linux-gtk-x86_64=lib/org.eclipse.equinox.launcher.gtk.linux.x86_64_1.0.101.R34x_v20080731/**
-ecl-swt-macosx-carbon=lib/org.eclipse.swt.carbon.macosx_3.4.1.v3449c.jar
-ecl-equinox-launcher-macosx-carbon=lib/org.eclipse.equinox.launcher.carbon.macosx_1.0.101.R34x_v20080731/**
-ecl-swt-solaris-gtk-sparc=lib/org.eclipse.swt.gtk.solaris.sparc_3.4.1.v3449c.jar
-ecl-equinox-launcher-solaris-gtk-sparc=lib/org.eclipse.equinox.launcher.gtk.solaris.sparc_1.0.101.R34x_v20080731/**
+perftests.libs=${geronimo-jms} ${slf4j-api} ${log4j} ${slf4j-log4j} ${commons-logging} ${commons-collections} ${commons-beanutils-core} ${commons-lang} ${gson-all}
management-common.libs=
-management-eclipse-plugin.core-libs=${ibm-icu} ${ecl-core-jface} ${ecl-core-jface-databinding} \
- ${ecl-core-commands} ${ecl-core-contenttype} ${ecl-core-databinding} ${ecl-core-expressions} \
- ${ecl-core-jobs} ${ecl-core-runtime} ${ecl-equinox-app} ${ecl-equinox-common} ${ecl-equinox-launcher} \
- ${ecl-equinox-prefs} ${ecl-equinox-registry} ${ecl-help} ${ecl-osgi} ${ecl-swt} ${ecl-ui} ${ecl-ui-forms} \
- ${ecl-ui-workbench} ${apache-commons-codec}
-
-management-eclipse-plugin.swt-libs=${ecl-swt-win32-win32-x86} ${ecl-swt-linux-gtk-x86} ${ecl-swt-macosx-carbon} \
- ${ecl-swt-linux-gtk-x86_64} ${ecl-swt-solaris-gtk-sparc}
-
-management-eclipse-plugin.libs=${management-eclipse-plugin.core-libs} ${management-eclipse-plugin.swt-libs}
-
-management-eclipse-plugin-win32-win32-x86.libs=${management-eclipse-plugin.core-libs} \
- ${ecl-swt-win32-win32-x86} ${ecl-equinox-launcher-win32-win32-x86} ${ecl-core-runtime-compat-registry}
-management-eclipse-plugin-linux-gtk-x86.libs=${management-eclipse-plugin.core-libs} \
- ${ecl-swt-linux-gtk-x86} ${ecl-equinox-launcher-linux-gtk-x86} ${ecl-core-runtime-compat-registry}
-management-eclipse-plugin-linux-gtk-x86_64.libs=${management-eclipse-plugin.core-libs} \
- ${ecl-swt-linux-gtk-x86_64} ${ecl-equinox-launcher-linux-gtk-x86_64} ${ecl-core-runtime-compat-registry}
-management-eclipse-plugin-macosx.libs=${management-eclipse-plugin.core-libs} \
- ${ecl-swt-macosx-carbon} ${ecl-equinox-launcher-macosx-carbon} ${ecl-core-runtime-compat-registry}
-management-eclipse-plugin-solaris-gtk-sparc.libs=${management-eclipse-plugin.core-libs} \
- ${ecl-swt-solaris-gtk-sparc} ${ecl-equinox-launcher-solaris-gtk-sparc} ${ecl-core-runtime-compat-registry}
-
common.test.libs=${test.libs}
broker.test.libs=${test.libs}
client.test.libs=${test.libs}
@@ -139,27 +97,30 @@ testkit.test.libs=${test.libs}
systests.libs=${test.libs}
perftests.test.libs=${test.libs}
-broker-plugins.test.libs=${test.libs}
+broker-plugins.test.libs=${test.libs}
-management-eclipse-plugin.test.libs=${test.libs}
management-common.test.libs=${test.libs}
# JCA Resource adapter
-ra.libs=${geronimo-j2ee} ${geronimo-jta} ${geronimo-jms} ${test.libs} ${geronimo-kernel} ${geronimo-openejb}
-ra.test.libs=${test.libs}
-
-#note this is a hack because systests is looking for 'jca' versus 'ra' we need to do this twice
-jca.libs=${geronimo-j2ee} ${geronimo-jta} ${geronimo-jms} ${test.libs} ${geronimo-kernel} ${geronimo-openejb}
+jca.libs=${geronimo-j2ee} ${geronimo-jta} ${geronimo-jms} ${test.libs} ${geronimo-kernel} ${geronimo-openejb} ${geronimo-servlet}
jca.test.libs=${test.libs}
# optional bdbstore module deps
-bdb-je=lib/bdbstore/je-5.0.34.jar
+bdb-je=lib/bdbstore/je-5.0.55.jar
bdbstore.libs=${bdb-je}
bdbstore.test.libs=${test.libs}
+bdbstore-jmx.libs=${bdb-je}
+bdbstore-jmx.test.libs=${test.libs}
+
# optional perftests-visualisation-jfc module deps
jfreechart.jar=lib/jfree/jfreechart-1.0.13.jar
-jfreecommon.jar=lib/jfree/jfreecommon-1.0.16.jar
+jcommon.jar=lib/jfree/jcommon-1.0.16.jar
csvjdbc.jar=lib/csvjdbc/csvjdbc-1.0.8.jar
-perftests-visualisation-jfc.libs=${jfreechart.jar} ${jfreecommon.jar} ${csvjdbc.jar}
+perftests-visualisation-jfc.libs=${jfreechart.jar} ${jcommon.jar} ${csvjdbc.jar}
perftests-visualisation-jfc.test.libs=${test.libs}
+
+# Libraries used only within the build
+bnd=lib/required/bnd-0.0.384.jar
+jython=lib/required/jython-standalone-2.5.2.jar
+maven-ant-tasks=lib/required/maven-ant-tasks-2.1.1.jar
Modified: qpid/branches/asyncstore/java/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/build.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/build.xml (original)
+++ qpid/branches/asyncstore/java/build.xml Fri Aug 3 12:13:32 2012
@@ -18,29 +18,35 @@
- under the License.
-
-->
-<project name="AMQ Java" default="build">
+<project name="AMQ Java" xmlns:ivy="antlib:org.apache.ivy.ant" default="build">
<import file="common.xml"/>
<findSubProjects name="broker-plugins" dir="broker-plugins"/>
<findSubProjects name="client-plugins" dir="client-plugins"/>
- <findSubProjects name="management" dir="management" excludes="common,example"/>
+
+ <property name="optional" value="false"/>
+ <property name="modules.opt.default" value="bdbstore bdbstore/jmx perftests/visualisation-jfc"/>
+ <condition property="modules.opt" value="" else="${modules.opt.default}">
+ <isfalse value="${optional}"/>
+ </condition>
<property name="modules.core" value="common management/common amqp-1-0-common broker client amqp-1-0-client amqp-1-0-client-jms tools"/>
<property name="modules.examples" value="client/example management/example"/>
<property name="modules.tests" value="systests perftests"/>
- <property name="modules.management" value="${management}"/>
<property name="modules.plugin" value="${broker-plugins} ${client-plugins}"/>
- <property name="modules.opt" value=""/>
<property name="modules.jca" value="jca"/>
<property name="modules" value="${modules.core} ${modules.examples}
- ${modules.management} ${modules.jca} ${modules.tests} ${modules.plugin} ${modules.opt}"/>
+ ${modules.jca} ${modules.tests} ${modules.plugin} ${modules.opt}"/>
<property name="qpid.jar" location="${build.lib}/qpid-all.jar"/>
<basename property="qpid.jar.name" file="${qpid.jar}"/>
<property name="resources" value="${project.root}/resources"/>
+ <!-- Modules for which coverage will be created by the cover-test target -->
+ <property name="coverage.modules" value="${modules}"/>
+
<map property="release.excludes" value="${modules}">
<globmapper from="*" to="*/\*\*"/>
</map>
@@ -156,7 +162,7 @@
<touch file="${qpid.jar}"/>
</target>
- <target name="build" description="build the project">
+ <target name="build" depends="retrieve-dependencies" description="build the project">
<iterate target="build"/>
<antcall target="manifest"/>
</target>
@@ -212,33 +218,39 @@
</target>
<target name="coverage-report" description="generate coverage report" depends="cobertura-init">
- <cobertura-merge datafile="${build.coveragereport}/cobertura.ser">
+ <mkdir dir="${build.coverage.report}" />
+ <mkdir dir="${build.coverage.src}" />
+
+ <cobertura-merge datafile="${build.coverage.report}/cobertura.ser">
<!-- merge all module coverage reports -->
<fileset dir="${build}">
<include name="**/*.ser"/>
</fileset>
</cobertura-merge>
+
+ <!-- Copy all covered sources to single directory for cobertura report -->
+
+ <foreach property="module" list="${coverage.modules}">
+
+ <copy todir="${build.coverage.src}">
+ <fileset dir="${module}/src/main/java" includes="**/*.java"/>
+ </copy>
+
+ <copy todir="${build.coverage.src}">
+ <fileset dir="build/scratch/${module}/src" includes="**/*.java"/>
+ </copy>
+ </foreach>
+
<cobertura-report format="xml"
- destdir="${build.coveragereport}"
- datafile="${build.coveragereport}/cobertura.ser"
- >
- <fileset dir="${project.root}/common/src/main/java" includes="**/*.java" />
- <fileset dir="${project.root}/build/scratch/common/src" includes="**/*.java" />
- <fileset dir="${project.root}/broker/src/main/java" includes="**/*.java" />
- <fileset dir="${project.root}/build/scratch/broker/src" includes="**/*.java" />
- <fileset dir="${project.root}/client/src/main/java" includes="**/*.java" />
- <fileset dir="${project.root}/build/scratch/client/src" includes="**/*.java" />
+ destdir="${build.coverage.report}"
+ datafile="${build.coverage.report}/cobertura.ser">
+ <fileset dir="${build.coverage.src}" includes="**/*.java" />
</cobertura-report>
- <cobertura-report format="html"
- destdir="${build.coveragereport}"
- datafile="${build.coveragereport}/cobertura.ser"
- >
- <fileset dir="${project.root}/common/src/main/java" includes="**/*.java" />
- <fileset dir="${project.root}/build/scratch/common/src" includes="**/*.java" />
- <fileset dir="${project.root}/broker/src/main/java" includes="**/*.java" />
- <fileset dir="${project.root}/build/scratch/broker/src" includes="**/*.java" />
- <fileset dir="${project.root}/client/src/main/java" includes="**/*.java" />
- <fileset dir="${project.root}/build/scratch/client/src" includes="**/*.java" />
+
+ <cobertura-report format="html"
+ destdir="${build.coverage.report}"
+ datafile="${build.coverage.report}/cobertura.ser">
+ <fileset dir="${build.coverage.src}" includes="**/*.java" />
</cobertura-report>
</target>
@@ -247,7 +259,7 @@
</target>
<target name="cover-test" description="run tests and generate coverage information" depends="build">
- <iterate target="cover-test" modules="broker client common"/>
+ <iterate target="cover-test" modules="${coverage.modules}"/>
</target>
<target name="test-interop" depends="build,compile-tests"
@@ -270,7 +282,7 @@
</jython>
</target>
- <target name="findbugs" depends="findbugs-init,build">
+ <target name="findbugs" depends="build,findbugs-init">
<mkdir dir="${build.findbugs}"/>
@@ -280,13 +292,15 @@
stylesheet="fancy-hist.xsl"
jvmargs="-Xmx512m"
projectName="Qpid">
- <auxAnalyzePath>
- <fileset dir="${build.lib}" includes="qpid*.jar" excludes="*test*.jar *junit*.jar *example*.jar qpid-all.jar"/>
- </auxAnalyzePath>
+ <!-- The classes for analysis -->
+ <fileset dir="${build.lib}" includes="qpid*.jar" excludes="*test*.jar *-sources*.jar *example*.jar qpid-all.jar"/>
+
+ <!--Additional classes, not for analysis -->
<auxClassPath>
- <fileset dir="${build.lib}" includes="**/*.jar" />
- <fileset dir="${basedir}/lib" includes="org.eclipse*.jar com.ibm.*.jar"/>
+ <fileset dir="${basedir}/lib" includes="**/*.jar"/>
</auxClassPath>
+
+ <!-- Source, used for improved reporting -->
<sourcePath>
<fileset dir="${basedir}" includes="**/src/**/org/.." />
</sourcePath>
@@ -296,4 +310,22 @@
<target name="eclipse" description="build eclipse project and classpath files">
<iterate target="eclipse"/>
</target>
+
+ <!-- check the following properties which must be specified by the user-->
+ <target name="check-upload-props-exist" description="check that the required properties have been set">
+ <fail unless="nexus.user" message="You must supply the 'nexus.user' property"/>
+ <fail unless="nexus.password" message="You must supply the 'nexus.password' property"/>
+ <fail unless="maven.artifact.dir" message="You must supply the 'maven.artifact.dir' property"/>
+ </target>
+
+ <target name="perform-nexus-upload">
+ <ivy:configure file="ivysettings.nexus.xml"/>
+ <ivy:resolve file="ivy.nexus.xml"/>
+ <ivy:deliver/>
+ <ivy:publish publishivy="false" resolver="nexus"
+ artifactspattern="${maven.artifact.dir}/[organisation]/[module]/[artifact]/[revision]/[artifact]-[revision](-[classifier]).[ext]"/>
+ </target>
+
+ <target name="upload" depends="load-ivy, check-upload-props-exist, perform-nexus-upload"/>
+
</project>
Modified: qpid/branches/asyncstore/java/client/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/build.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/build.xml (original)
+++ qpid/branches/asyncstore/java/client/build.xml Fri Aug 3 12:13:32 2012
@@ -61,4 +61,5 @@
<target name="release-bin" depends="release-bin-tasks"/>
<target name="bundle" depends="bundle-tasks"/>
+
</project>
Modified: qpid/branches/asyncstore/java/client/src/main/java/client.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/client.bnd?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/client.bnd (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/client.bnd Fri Aug 3 12:13:32 2012
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.17.0
+ver: 0.19.0
Bundle-SymbolicName: qpid-client
Bundle-Version: ${ver}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Aug 3 12:13:32 2012
@@ -69,6 +69,7 @@ import javax.naming.StringRefAddr;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
+import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
@@ -78,11 +79,14 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
+ private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong();
+ private final long _connectionNumber;
/**
* This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
@@ -222,6 +226,13 @@ public class AMQConnection extends Close
throw new IllegalArgumentException("Connection must be specified");
}
+ _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Connection(" + _connectionNumber + "):" + connectionURL);
+ }
+
// set this connection maxPrefetch
if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
@@ -308,11 +319,6 @@ public class AMQConnection extends Close
_delegate = new AMQConnectionDelegate_0_10(this);
}
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Connection:" + connectionURL);
- }
-
_connectionURL = connectionURL;
_clientName = connectionURL.getClientName();
@@ -1519,4 +1525,18 @@ public class AMQConnection extends Close
{
return _delegate;
}
+
+ public Long getConnectionNumber()
+ {
+ return _connectionNumber;
+ }
+
+ protected void logConnected(SocketAddress localAddress, SocketAddress remoteAddress)
+ {
+ if(_logger.isInfoEnabled())
+ {
+ _logger.info("Connection " + _connectionNumber + " now connected from "
+ + localAddress + " to " + remoteAddress);
+ }
+ }
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Fri Aug 3 12:13:32 2012
@@ -222,6 +222,7 @@ public class AMQConnectionDelegate_0_10
_conn.setUsername(_qpidConnection.getUserID());
_conn.setMaximumChannelCount(_qpidConnection.getChannelMax());
_conn.getFailoverPolicy().attainedConnection();
+ _conn.logConnected(_qpidConnection.getLocalAddress(), _qpidConnection.getRemoteAddress());
}
catch (ProtocolVersionException pe)
{
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Fri Aug 3 12:13:32 2012
@@ -24,13 +24,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.BasicQosBody;
@@ -68,7 +66,6 @@ public class AMQConnectionDelegate_8_0 i
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
private final AMQConnection _conn;
-
public void closeConnection(long timeout) throws JMSException, AMQException
{
_conn.getProtocolHandler().closeConnection(timeout);
@@ -110,9 +107,11 @@ public class AMQConnectionDelegate_8_0 i
sslContext = SSLContextFactory.buildClientContext(
settings.getTrustStorePath(),
settings.getTrustStorePassword(),
+ settings.getTrustStoreType(),
settings.getTrustManagerFactoryAlgorithm(),
settings.getKeyStorePath(),
settings.getKeyStorePassword(),
+ settings.getKeyStoreType(),
settings.getKeyManagerFactoryAlgorithm(),
settings.getCertAlias());
}
@@ -137,6 +136,7 @@ public class AMQConnectionDelegate_8_0 i
{
_conn.getFailoverPolicy().attainedConnection();
_conn.setConnected(true);
+ _conn.logConnected(network.getLocalAddress(), network.getRemoteAddress());
return null;
}
else
@@ -283,7 +283,14 @@ public class AMQConnectionDelegate_8_0 i
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
+
+ // reset the flow control flag
+ // on opening channel, broker sends flow blocked if virtual host is blocked
+ // if virtual host is not blocked, then broker does not send flow command
+ // that's why we need to reset the flow control flag
+ s.setFlowControl(true);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted());
+
s.resubscribe();
}
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java Fri Aug 3 12:13:32 2012
@@ -55,12 +55,13 @@ public class AMQConnectionFactory implem
ObjectFactory, Referenceable, XATopicConnectionFactory,
XAQueueConnectionFactory, XAConnectionFactory
{
- private final ConnectionURL _connectionDetails;
+ protected static final String NO_URL_CONFIGURED = "The connection factory wasn't created with a proper URL, the connection details are empty";
+
+ private ConnectionURL _connectionDetails;
// The default constructor is necessary to allow AMQConnectionFactory to be deserialised from JNDI
public AMQConnectionFactory()
{
- _connectionDetails = null;
}
public AMQConnectionFactory(final String url) throws URLSyntaxException
@@ -106,6 +107,11 @@ public class AMQConnectionFactory implem
public Connection createConnection() throws JMSException
{
+ if(_connectionDetails == null)
+ {
+ throw new JMSException(NO_URL_CONFIGURED);
+ }
+
try
{
if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals(""))
@@ -158,7 +164,7 @@ public class AMQConnectionFactory implem
}
else
{
- throw new JMSException("The connection factory wasn't created with a proper URL, the connection details are empty");
+ throw new JMSException(NO_URL_CONFIGURED);
}
}
@@ -193,6 +199,12 @@ public class AMQConnectionFactory implem
return _connectionDetails.toString();
}
+ //setter necessary to use instances created with the default constructor (which we can't remove)
+ public final void setConnectionURLString(String url) throws URLSyntaxException
+ {
+ _connectionDetails = new AMQConnectionURL(url);
+ }
+
/**
* JNDI interface to create objects from References.
*
@@ -332,7 +344,7 @@ public class AMQConnectionFactory implem
}
else
{
- throw new JMSException("The connection factory wasn't created with a proper URL, the connection details are empty");
+ throw new JMSException(NO_URL_CONFIGURED);
}
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Aug 3 12:13:32 2012
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.client;
+import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
+import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
+import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,18 +122,17 @@ public abstract class AMQSession<C exten
/** Immediate message prefetch default. */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
- public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
-
/**
* The period to wait while flow controlled before sending a log message confirming that the session is still
* waiting on flow control being revoked
*/
- private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+ private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
+ DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
/**
* The period to wait while flow controlled before declaring a failure
*/
- private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure",
+ private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
private final boolean _delareQueues =
@@ -797,11 +801,8 @@ public abstract class AMQSession<C exten
if (e instanceof AMQDisconnectedException)
{
- if (_dispatcherThread != null)
- {
- // Failover failed and ain't coming back. Knife the dispatcher.
- _dispatcherThread.interrupt();
- }
+ // Failover failed and ain't coming back. Knife the dispatcher.
+ stopDispatcherThread();
}
@@ -830,6 +831,13 @@ public abstract class AMQSession<C exten
}
}
+ protected void stopDispatcherThread()
+ {
+ if (_dispatcherThread != null)
+ {
+ _dispatcherThread.interrupt();
+ }
+ }
/**
* Commits all messages done in this transaction and releases any locks currently held.
*
@@ -2366,7 +2374,10 @@ public abstract class AMQSession<C exten
{
throw new Error("Error creating Dispatcher thread",e);
}
- _dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
+
+ String dispatcherThreadName = "Dispatcher-" + _channelId + "-Conn-" + _connection.getConnectionNumber();
+
+ _dispatcherThread.setName(dispatcherThreadName);
_dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD);
_dispatcher.setConnectionStopped(initiallyStopped);
_dispatcherThread.start();
@@ -3130,6 +3141,14 @@ public abstract class AMQSession<C exten
_ticket = ticket;
}
+ public boolean isFlowBlocked()
+ {
+ synchronized (_flowControl)
+ {
+ return !_flowControl.getFlowControl();
+ }
+ }
+
public void setFlowControl(final boolean active)
{
_flowControl.setFlowControl(active);
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Aug 3 12:13:32 2012
@@ -128,7 +128,8 @@ public class AMQSession_0_10 extends AMQ
* Used to store the range of in tx messages
*/
private final RangeSet _txRangeSet = RangeSetFactory.createRangeSet();
- private int _txSize = 0;
+ private int _txSize = 0;
+ private boolean _isHardError = Boolean.getBoolean("qpid.session.legacy_exception_behaviour");
//--- constructors
/**
@@ -390,11 +391,7 @@ public class AMQSession_0_10 extends AMQ
*/
public void sendClose(long timeout) throws AMQException, FailoverException
{
- if (flushTask != null)
- {
- flushTask.cancel();
- flushTask = null;
- }
+ cancelTimerTask();
flushAcknowledgments();
try
{
@@ -1051,9 +1048,22 @@ public class AMQSession_0_10 extends AMQ
{
code = ee.getErrorCode().getValue();
}
- AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
+ AMQException amqe = new AMQException(AMQConstant.getConstant(code), _isHardError, se.getMessage(), se.getCause());
_currentException = amqe;
}
+ if (!_isHardError)
+ {
+ cancelTimerTask();
+ stopDispatcherThread();
+ try
+ {
+ closed(_currentException);
+ }
+ catch(Exception e)
+ {
+ _logger.warn("Error closing session", e);
+ }
+ }
getAMQConnection().exceptionReceived(_currentException);
}
@@ -1408,5 +1418,19 @@ public class AMQSession_0_10 extends AMQ
sync();
}
+ @Override
+ public boolean isFlowBlocked()
+ {
+ return _qpidSession.isFlowBlocked();
+ }
+
+ private void cancelTimerTask()
+ {
+ if (flushTask != null)
+ {
+ flushTask.cancel();
+ flushTask = null;
+ }
+ }
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Fri Aug 3 12:13:32 2012
@@ -17,8 +17,13 @@
*/
package org.apache.qpid.client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import org.apache.qpid.dtx.XidImpl;
import org.apache.qpid.transport.DtxXaStatus;
@@ -28,15 +33,13 @@ import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.RecoverResult;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.XaResult;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is an implementation of javax.njms.XAResource.
*/
-public class XAResourceImpl implements XAResource
+public class XAResourceImpl implements AMQXAResource
{
/**
* this XAResourceImpl's logger
@@ -57,9 +60,11 @@ public class XAResourceImpl implements X
* The time for this resource
*/
private int _timeout;
-
+
//--- constructor
-
+
+ private List<XAResource> _siblings = new ArrayList<XAResource>();
+
/**
* Create an XAResource associated with a XASession
*
@@ -157,7 +162,21 @@ public class XAResourceImpl implements X
_xaSession.createSession();
convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
+
checkStatus(result.getStatus());
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Calling end for " + _siblings.size() + " XAResource siblings");
+ }
+
+ for(XAResource sibling: _siblings)
+ {
+
+ sibling.end(xid, flag);
+ }
+
+ _siblings.clear();
}
@@ -216,28 +235,38 @@ public class XAResourceImpl implements X
* @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
*/
public boolean isSameRM(XAResource xaResource) throws XAException
- {
+ {
if(this == xaResource)
{
- return true;
- }
- if(!(xaResource instanceof XAResourceImpl))
- {
- return false;
- }
-
- XAResourceImpl other = (XAResourceImpl)xaResource;
-
- String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
- String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID();
-
+ return true;
+ }
+
+ if(!(xaResource instanceof AMQXAResource))
+ {
+ return false;
+ }
+
+ String myUUID = getBrokerUUID();
+ String otherUUID = ((AMQXAResource)xaResource).getBrokerUUID();
+
if(_logger.isDebugEnabled())
{
_logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID);
}
-
- return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
-
+
+ boolean isSameRm = (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
+
+ if(isSameRm)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("XAResource " + xaResource + " is from the ResourceManager. Adding XAResource as sibling for AMQP protocol support. ");
+ }
+ _siblings.add(xaResource);
+ }
+
+ return isSameRm;
+
}
/**
@@ -369,12 +398,12 @@ public class XAResourceImpl implements X
{
_timeout = timeout;
if (timeout != _timeout && _xid != null)
- {
+ {
setDtxTimeout(_timeout);
}
return true;
}
-
+
private void setDtxTimeout(int timeout) throws XAException
{
_xaSession.getQpidSession()
@@ -437,18 +466,23 @@ public class XAResourceImpl implements X
{
setDtxTimeout(_timeout);
}
+
+ for(XAResource sibling: _siblings)
+ {
+ sibling.start(xid, flag);
+ }
}
/**
* Is this resource currently enlisted in a transaction?
- *
+ *
* @return true if the resource is associated with a transaction, false otherwise.
*/
public boolean isEnlisted()
{
return (_xid != null) ;
}
-
+
//------------------------------------------------------------------------
// Private methods
//------------------------------------------------------------------------
@@ -517,7 +551,7 @@ public class XAResourceImpl implements X
}
catch (XAException e)
{
- e.printStackTrace();
+ _logger.error(e.getMessage(), e);
throw e;
}
case ILLEGAL_STATE:
@@ -544,7 +578,7 @@ public class XAResourceImpl implements X
* convert a generic xid into qpid format
* @param xid xid to be converted
* @return the qpid formated xid
- * @throws XAException when xid is null
+ * @throws XAException when xid is null
*/
private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException
{
@@ -556,4 +590,13 @@ public class XAResourceImpl implements X
return XidImpl.convert(xid);
}
+ public String getBrokerUUID()
+ {
+ return ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
+ }
+
+ public List<XAResource> getSiblings()
+ {
+ return Collections.unmodifiableList(_siblings);
+ }
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Fri Aug 3 12:13:32 2012
@@ -17,6 +17,7 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.transport.RangeSet;
@@ -31,7 +32,7 @@ import javax.jms.XATopicSession;
import javax.transaction.xa.XAResource;
/**
- * This is an implementation of the javax.njms.XASEssion interface.
+ * This is an implementation of the javax.jms.XASession interface.
*/
public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopicSession, XAQueueSession
{
@@ -67,7 +68,7 @@ public class XASessionImpl extends AMQSe
{
this(qpidConnection, con, channelId, false, ackMode, MessageFactoryRegistry.newDefaultRegistry(),
defaultPrefetchHigh, defaultPrefetchLow, null);
-
+
}
public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
@@ -92,9 +93,6 @@ public class XASessionImpl extends AMQSe
_qpidDtxSession.dtxSelect();
}
-
- // javax.njms.XASEssion API
-
/**
* Gets the session associated with this XASession.
*
@@ -192,4 +190,11 @@ public class XASessionImpl extends AMQSe
super.acknowledgeImpl() ;
}
}
+
+ @Override
+ void resubscribe() throws AMQException
+ {
+ super.resubscribe();
+ createSession();
+ }
}
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java Fri Aug 3 12:13:32 2012
@@ -1,4 +1,3 @@
-package org.apache.qpid.client.handler;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.client.handler;
* under the License.
*
*/
+package org.apache.qpid.client.handler;
import org.slf4j.Logger;
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java Fri Aug 3 12:13:32 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.client.handler;
import org.slf4j.Logger;
@@ -8,26 +28,6 @@ import org.apache.qpid.client.protocol.A
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ChannelFlowBody;
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
public class ChannelFlowMethodHandler implements StateAwareMethodListener<ChannelFlowBody>
{
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java Fri Aug 3 12:13:32 2012
@@ -1,4 +1,3 @@
-package org.apache.qpid.client.message;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.client.message;
* under the License.
*
*/
+package org.apache.qpid.client.message;
import org.apache.qpid.AMQException;
Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java Fri Aug 3 12:13:32 2012
@@ -1,4 +1,3 @@
-package org.apache.qpid.client.message;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.client.message;
* under the License.
*
*/
+package org.apache.qpid.client.message;
import org.apache.qpid.AMQException;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org