You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/06/19 11:02:01 UTC
svn commit: r669431 [1/2] - in /incubator/qpid/trunk/qpid/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/ broker/src/main/grammar/ bro...
Author: rgodfrey
Date: Thu Jun 19 02:01:59 2008
New Revision: 669431
URL: http://svn.apache.org/viewvc?rev=669431&view=rev
Log:
QPID-950 : Broker refactoring, copied / merged from branch
Added:
incubator/qpid/trunk/qpid/java/broker/src/
- copied from r662382, incubator/qpid/branches/broker-queue-refactor/java/broker/src/
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
Removed:
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionSetTest.java
Modified:
incubator/qpid/trunk/qpid/java/010ExcludeList
incubator/qpid/trunk/qpid/java/010ExcludeList-store
incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java
incubator/qpid/trunk/qpid/java/systests/pom.xml
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
Modified: incubator/qpid/trunk/qpid/java/010ExcludeList
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/010ExcludeList?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/010ExcludeList (original)
+++ incubator/qpid/trunk/qpid/java/010ExcludeList Thu Jun 19 02:01:59 2008
@@ -46,4 +46,8 @@
org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteNoTxPubSub
org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxPubSub
// the 0.10 c++ broker does not implement forget
-org.apache.qpid.test.unit.xa.FaultTest#testForget
\ No newline at end of file
+org.apache.qpid.test.unit.xa.FaultTest#testForget
+// the 0-10 c++ broker does not implement priority / this test depends on a Java broker extension for queue creation
+org.apache.qpid.server.queue.PriorityTest
+//this test checks explicitly for 0-8 flow control semantics
+org.apache.qpid.test.client.FlowControlTest
Modified: incubator/qpid/trunk/qpid/java/010ExcludeList-store
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/010ExcludeList-store?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/010ExcludeList-store (original)
+++ incubator/qpid/trunk/qpid/java/010ExcludeList-store Thu Jun 19 02:01:59 2008
@@ -41,4 +41,9 @@
org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteNoTxPubSub
org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxPubSub
// the 0.10 c++ broker does not implement forget
-org.apache.qpid.test.unit.xa.FaultTest#testForget
\ No newline at end of file
+org.apache.qpid.test.unit.xa.FaultTest#testForget
+// the 0-10 c++ broker does not implement priority / this test depends on a Java broker extension for queue creation
+org.apache.qpid.server.queue.PriorityTest
+//this test checks explicitly for 0-8 flow control semantics
+org.apache.qpid.test.client.FlowControlTest
+
Modified: incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Thu Jun 19 02:01:59 2008
@@ -22,6 +22,8 @@
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
import javax.management.JMException;
import javax.management.openmbean.OpenDataException;
@@ -34,7 +36,7 @@
import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.junit.extensions.util.SizeOf;
@@ -191,7 +193,7 @@
return false;
}
- public void route(AMQMessage payload) throws AMQException
+ public void route(IncomingMessage payload) throws AMQException
{
Long value = new Long(SizeOf.getUsedMemory());
@@ -201,17 +203,14 @@
headers.put(key, value);
((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
-
- payload.enqueue(q);
+
+ Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(q);
+ payload.enqueue(queues);
}
- @Override
- public Map<AMQShortString, List<AMQQueue>> getBindings() {
- // TODO Auto-generated method stub
- return null;
- }
-
+
public boolean isBound(AMQShortString routingKey, FieldTable arguments,
AMQQueue queue) {
// TODO Auto-generated method stub
Modified: incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java Thu Jun 19 02:01:59 2008
@@ -1,24 +1,3 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
package org.apache.qpid.extras.exchanges.example;
import java.util.List;
@@ -28,7 +7,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -102,7 +81,7 @@
{
}
- public void route(AMQMessage message) throws AMQException
+ public void route(IncomingMessage message) throws AMQException
{
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj?rev=669431&r1=662382&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj Thu Jun 19 02:01:59 2008
@@ -94,7 +94,7 @@
return this.JmsSelector();
}
catch (Throwable e) {
- throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql).initCause(e);
+ throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql,e);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=669431&r1=662382&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Thu Jun 19 02:01:59 2008
@@ -67,7 +67,7 @@
if (exchType == null)
{
- throw new AMQUnknownExchangeType("Unknown exchange type: " + type);
+ throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null);
}
Exchange e = exchType.newInstance(_host, exchange, durable, ticket, autoDelete);
return e;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=669431&r1=662382&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Thu Jun 19 02:01:59 2008
@@ -101,7 +101,7 @@
else if (!exchange.getType().equals(body.getType()))
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java?rev=669431&r1=662382&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java Thu Jun 19 02:01:59 2008
@@ -1,37 +1,37 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.security.access;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQQueue;
-
-public enum Permission
-{
- CONSUME,
- PUBLISH,
- CREATE,
- ACCESS,
- BIND,
- UNBIND,
- DELETE,
- PURGE
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public enum Permission
+{
+ CONSUME,
+ PUBLISH,
+ CREATE,
+ ACCESS,
+ BIND,
+ UNBIND,
+ DELETE,
+ PURGE
+}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Jun 19 02:01:59 2008
@@ -1014,6 +1014,7 @@
}
}
+
/**
* Declares the named queue.
*
@@ -1031,18 +1032,40 @@
public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
final boolean exclusive) throws AMQException
{
+ createQueue(name, autoDelete, durable, exclusive, null);
+ }
+
+
+ /**
+ * Declares the named queue.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param name The name of the queue to declare.
+ * @param autoDelete
+ * @param durable Flag to indicate that the queue is durable.
+ * @param exclusive Flag to indicate that the queue is exclusive to this client.
+ * @param arguments Arguments used to set special properties of the queue
+ *
+ * @throws AMQException If the queue cannot be declared for any reason.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
+ final boolean exclusive, final Map<String, Object> arguments) throws AMQException
+ {
new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendCreateQueue(name, autoDelete, durable, exclusive);
+ sendCreateQueue(name, autoDelete, durable, exclusive, arguments);
return null;
}
}, _connection).execute();
}
public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
- final boolean exclusive)throws AMQException, FailoverException;
+ final boolean exclusive, final Map<String, Object> arguments)throws AMQException, FailoverException;
/**
* Creates a QueueReceiver
*
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Jun 19 02:01:59 2008
@@ -243,13 +243,14 @@
* @param durable If set when creating a new queue,
* the queue will be marked as durable.
* @param exclusive Exclusive queues can only be used from one connection at a time.
+ * @param arguments Exclusive queues can only be used from one connection at a time.
* @throws AMQException
* @throws FailoverException
*/
public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
- final boolean exclusive) throws AMQException, FailoverException
+ final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException
{
- getQpidSession().queueDeclare(name.toString(), null, null, durable ? Option.DURABLE : Option.NO_OPTION,
+ getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NO_OPTION,
autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
exclusive ? Option.EXCLUSIVE : Option.NO_OPTION);
// We need to sync so that we get notify of an error.
@@ -594,7 +595,7 @@
try
{
// this is done so that we can produce to a temporary queue beofre we create a consumer
- sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
+ sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive(),null);
sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result);
result.setQueueName(result.getRoutingKey());
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Jun 19 02:01:59 2008
@@ -40,6 +40,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
public final class AMQSession_0_8 extends AMQSession
{
@@ -125,10 +127,19 @@
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
}
- public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException,
+ public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
FailoverException
{
- QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null);
+ FieldTable table = null;
+ if(arguments != null && !arguments.isEmpty())
+ {
+ table = new FieldTable();
+ for(Map.Entry<String, Object> entry : arguments.entrySet())
+ {
+ table.setObject(entry.getKey(), entry.getValue());
+ }
+ }
+ QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
AMQFrame queueDeclare = body.generateFrame(_channelId);
getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
@@ -412,6 +423,28 @@
return subscriber;
}
+
+
+
+ public void setPrefecthLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
+ {
+ new FailoverRetrySupport<Object, AMQException>(
+ new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+
+ return null;
+ }
+ }, _connection).execute();
+ }
+
class QueueDeclareOkHandler extends SpecificMethodFrameListener
{
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java Thu Jun 19 02:01:59 2008
@@ -206,16 +206,27 @@
MessageConsumer consumer2 = _clientSession1.createConsumer(_queue);
- for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
+ int msg;
+ for (msg = 0; msg < (MSG_COUNT / 2); msg++)
{
- assertTrue(_consumer1.receive(3000) != null);
+
+ final Message message = _consumer1.receive(1000);
+ if(message == null)
+ {
+ break;
+ }
+
}
- for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
+ _consumer1.close();
+ _clientSession1.close();
+
+ for (; msg < MSG_COUNT ; msg++)
{
- assertTrue(consumer2.receive(3000) != null);
+ assertTrue("Failed at msg id" + msg, _consumer2.receive(1000) != null);
}
+
}
else
{
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java Thu Jun 19 02:01:59 2008
@@ -62,7 +62,6 @@
private static final class FixedSizeByteBuffer extends ByteBuffer
{
private java.nio.ByteBuffer buf;
- private int refCount = 1;
private int mark = -1;
@@ -70,36 +69,14 @@
{
this.buf = buf;
buf.order( ByteOrder.BIG_ENDIAN );
- refCount = 1;
}
public synchronized void acquire()
{
- if( refCount <= 0 )
- {
- throw new IllegalStateException( "Already released buffer." );
- }
-
- refCount ++;
}
public void release()
{
- synchronized( this )
- {
- if( refCount <= 0 )
- {
- refCount = 0;
- throw new IllegalStateException(
- "Already released buffer. You released the buffer too many times." );
- }
-
- refCount --;
- if( refCount > 0)
- {
- return;
- }
- }
}
public java.nio.ByteBuffer buf()
@@ -157,50 +134,12 @@
{
if( newCapacity > capacity() )
{
- // Allocate a new buffer and transfer all settings to it.
- int pos = position();
- int limit = limit();
- ByteOrder bo = order();
-
- capacity0( newCapacity );
- buf.limit( limit );
- if( mark >= 0 )
- {
- buf.position( mark );
- buf.mark();
- }
- buf.position( pos );
- buf.order( bo );
+ throw new IllegalArgumentException();
}
return this;
}
- protected void capacity0( int requestedCapacity )
- {
- int newCapacity = MINIMUM_CAPACITY;
- while( newCapacity < requestedCapacity )
- {
- newCapacity <<= 1;
- }
-
- java.nio.ByteBuffer oldBuf = this.buf;
- java.nio.ByteBuffer newBuf;
- if( isDirect() )
- {
- newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity );
- }
- else
- {
- newBuf = java.nio.ByteBuffer.allocate( newCapacity );
- }
-
- newBuf.clear();
- oldBuf.clear();
- newBuf.put( oldBuf );
- this.buf = newBuf;
- }
-
public boolean isAutoExpand()
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java Thu Jun 19 02:01:59 2008
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.common;
+import org.apache.qpid.framing.AMQShortString;
+
/**
* Specifies the available client property types that different clients can use to identify themselves with.
*
@@ -30,8 +32,21 @@
*/
public enum ClientProperties
{
- instance,
- product,
- version,
- platform
+ instance("instance"),
+ product("product"),
+ version("version"),
+ platform("platform");
+
+ private final AMQShortString _amqShortString;
+
+ private ClientProperties(String name)
+ {
+ _amqShortString = new AMQShortString(name);
+ }
+
+
+ public AMQShortString toAMQShortString()
+ {
+ return _amqShortString;
+ }
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Thu Jun 19 02:01:59 2008
@@ -224,7 +224,6 @@
}
}
-
/**
* Get the length of the short string
* @return length of the underlying byte array
@@ -464,13 +463,49 @@
return false;
}
- if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+ final int hashCode = _hashCode;
+
+ final int otherHashCode = otherString._hashCode;
+
+ if ((hashCode != 0) && (otherHashCode != 0) && (hashCode != otherHashCode))
+ {
+ return false;
+ }
+
+ final int length = _length;
+
+ if(length != otherString._length)
{
return false;
}
- return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
- || Arrays.equals(getBytes(),otherString.getBytes());
+
+ final byte[] data = _data;
+
+ final byte[] otherData = otherString._data;
+
+ final int offset = _offset;
+
+ final int otherOffset = otherString._offset;
+
+ if(offset == 0 && otherOffset == 0 && length == data.length && length == otherData.length)
+ {
+ return Arrays.equals(data, otherData);
+ }
+ else
+ {
+ int thisIdx = offset;
+ int otherIdx = otherOffset;
+ for(int i = length; i-- != 0; )
+ {
+ if(!(data[thisIdx++] == otherData[otherIdx++]))
+ {
+ return false;
+ }
+ }
+ }
+
+ return true;
}
@@ -718,4 +753,17 @@
return false; //To change body of created methods use File | Settings | File Templates.
}
+
+ public static void main(String args[])
+ {
+ AMQShortString s = new AMQShortString("a.b.c.d.e.f.g.h.i.j.k");
+ AMQShortString s2 = s.substring(2, 7);
+
+ AMQShortStringTokenizer t = s2.tokenize((byte) '.');
+ while(t.hasMoreTokens())
+ {
+ System.err.println(t.nextToken());
+ }
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Thu Jun 19 02:01:59 2008
@@ -93,4 +93,24 @@
{
return "[" + getType() + ": " + getValue() + "]";
}
+
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof AMQTypedValue)
+ {
+ AMQTypedValue other = (AMQTypedValue) o;
+ return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Thu Jun 19 02:01:59 2008
@@ -74,7 +74,7 @@
buffer.skip((int) length);
}
- private AMQTypedValue getProperty(AMQShortString string)
+ public AMQTypedValue getProperty(AMQShortString string)
{
checkPropertyName(string);
@@ -891,6 +891,20 @@
return keys;
}
+ public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator()
+ {
+ if(_encodedForm != null)
+ {
+ return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize);
+ }
+ else
+ {
+ initMapIfNecessary();
+ return _properties.entrySet().iterator();
+ }
+ }
+
+
public Object get(AMQShortString key)
{
@@ -1050,6 +1064,95 @@
}
}
+ private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue>
+ {
+ private final AMQTypedValue _value;
+ private final AMQShortString _key;
+
+ public FieldTableEntry(final AMQShortString key, final AMQTypedValue value)
+ {
+ _key = key;
+ _value = value;
+ }
+
+ public AMQShortString getKey()
+ {
+ return _key;
+ }
+
+ public AMQTypedValue getValue()
+ {
+ return _value;
+ }
+
+ public AMQTypedValue setValue(final AMQTypedValue value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof FieldTableEntry)
+ {
+ FieldTableEntry other = (FieldTableEntry) o;
+ return (_key == null ? other._key == null : _key.equals(other._key))
+ && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return (getKey()==null ? 0 : getKey().hashCode())
+ ^ (getValue()==null ? 0 : getValue().hashCode());
+ }
+
+ }
+
+
+ private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>>
+ {
+
+ private final ByteBuffer _buffer;
+ private int _expectedRemaining;
+
+ public FieldTableIterator(ByteBuffer buffer, int length)
+ {
+ _buffer = buffer;
+ _expectedRemaining = buffer.remaining() - length;
+ }
+
+ public boolean hasNext()
+ {
+ return (_buffer.remaining() > _expectedRemaining);
+ }
+
+ public Map.Entry<AMQShortString, AMQTypedValue> next()
+ {
+ if(hasNext())
+ {
+ final AMQShortString key = EncodingUtils.readAMQShortString(_buffer);
+ AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer);
+ return new FieldTableEntry(key, value);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
+
+
public int hashCode()
{
initMapIfNecessary();
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Thu Jun 19 02:01:59 2008
@@ -50,7 +50,7 @@
*
* @todo For better re-usability could make the completion handler optional. Only run it when one is set.
*/
-public class Job implements Runnable
+public class Job implements ReadWriteRunnable
{
/** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
private final int _maxEvents;
@@ -67,18 +67,22 @@
/** Holds the completion continuation, called upon completion of a run of the job. */
private final JobCompletionHandler _completionHandler;
+ private final boolean _readJob;
+
/**
* Creates a new job that aggregates many continuations together.
*
* @param session The Mina session.
* @param completionHandler The per job run, terminal continuation.
* @param maxEvents The maximum number of aggregated continuations to process per run of the job.
+ * @param readJob
*/
- Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
+ Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
{
_session = session;
_completionHandler = completionHandler;
_maxEvents = maxEvents;
+ _readJob = readJob;
}
/**
@@ -157,6 +161,22 @@
}
}
+ public boolean isReadJob()
+ {
+ return _readJob;
+ }
+
+ public boolean isRead()
+ {
+ return _readJob;
+ }
+
+ public boolean isWrite()
+ {
+ return !_readJob;
+ }
+
+
/**
* Another interface for a continuation.
*
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Thu Jun 19 02:01:59 2008
@@ -60,24 +60,6 @@
* <td> {@link Job}, {@link Job.JobCompletionHandler}
* </table>
*
- * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events.
- * The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread
- * pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads
- * so there is concurrency. But why go to the trouble of seperating out the read and write events in that case?
- * Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these
- * stages.
- *
- * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in
- * it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run
- * and trips off another batch of 10 until they are all done. Why not just have a straight forward
- * consumer/producer queue scenario without the batches of 10? So instead of having many jobs with batches of 10
- * in them, just have one queue of events and worker threads taking the next event. There will be coordination
- * between worker threads and new events arriving on the job anyway, so the simpler scenario may have the same
- * amount of contention. I can see that the batches of 10 is done, so that no job is allowed to hog the worker
- * pool for too long. I'm not convinced this fairly complex scheme will actually add anything, and it might be
- * better to encapsulate it under a Queue interface anyway, so that different queue implementations can easily
- * be substituted in.
- *
* @todo The static helper methods are pointless. Could just call new.
*/
public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
@@ -96,17 +78,20 @@
private final int _maxEvents;
+ private final boolean _readFilter;
+
/**
* Creates a named pooling filter, on the specified shared thread pool.
*
* @param refCountingPool The thread pool reference.
* @param name The identifying name of the filter type.
*/
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents)
+ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter)
{
_poolReference = refCountingPool;
_name = name;
_maxEvents = maxEvents;
+ _readFilter = readFilter;
}
/**
@@ -167,7 +152,6 @@
void fireAsynchEvent(Job job, Event event)
{
- // job.acquire(); //prevents this job being removed from _jobs
job.add(event);
final ExecutorService pool = _poolReference.getPool();
@@ -201,7 +185,7 @@
*/
public void createNewJobForSession(IoSession session)
{
- Job job = new Job(session, this, MAX_JOB_EVENTS);
+ Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter);
session.setAttribute(_name, job);
}
@@ -433,7 +417,7 @@
*/
public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS));
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true);
}
/**
@@ -476,7 +460,7 @@
*/
public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
{
- super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS));
+ super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false);
}
/**
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java?rev=669431&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java Thu Jun 19 02:01:59 2008
@@ -0,0 +1,432 @@
+package org.apache.qpid.pool;
+
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable>
+{
+
+ private final AtomicInteger _count = new AtomicInteger(0);
+
+ private final ReentrantLock _takeLock = new ReentrantLock();
+
+ private final Condition _notEmpty = _takeLock.newCondition();
+
+ private final ReentrantLock _putLock = new ReentrantLock();
+
+ private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
+
+ private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
+
+
+ private class ReadWriteJobIterator implements Iterator<Runnable>
+ {
+
+ private boolean _onReads;
+ private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator();
+
+ public boolean hasNext()
+ {
+ if(!_iter.hasNext())
+ {
+ if(_onReads)
+ {
+ _iter = _readJobQueue.iterator();
+ _onReads = true;
+ return _iter.hasNext();
+ }
+ else
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public Runnable next()
+ {
+ if(_iter.hasNext())
+ {
+ return _iter.next();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void remove()
+ {
+ _takeLock.lock();
+ try
+ {
+ _iter.remove();
+ _count.decrementAndGet();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+ }
+
+ public Iterator<Runnable> iterator()
+ {
+ return new ReadWriteJobIterator();
+ }
+
+ public int size()
+ {
+ return _count.get();
+ }
+
+ public boolean offer(final Runnable runnable)
+ {
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
+ final ReentrantLock putLock = _putLock;
+ putLock.lock();
+ try
+ {
+ if(job.isRead())
+ {
+ _readJobQueue.offer(job);
+ }
+ else
+ {
+ _writeJobQueue.offer(job);
+ }
+ if(_count.getAndIncrement() == 0)
+ {
+ _takeLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+ return true;
+ }
+ finally
+ {
+ putLock.unlock();
+ }
+ }
+
+ public void put(final Runnable runnable) throws InterruptedException
+ {
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
+ final ReentrantLock putLock = _putLock;
+ putLock.lock();
+
+ try
+ {
+ if(job.isRead())
+ {
+ _readJobQueue.offer(job);
+ }
+ else
+ {
+ _writeJobQueue.offer(job);
+ }
+ if(_count.getAndIncrement() == 0)
+ {
+ _takeLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+
+ }
+ finally
+ {
+ putLock.unlock();
+ }
+ }
+
+
+
+ public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException
+ {
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
+ final ReentrantLock putLock = _putLock;
+ putLock.lock();
+
+ try
+ {
+ if(job.isRead())
+ {
+ _readJobQueue.offer(job);
+ }
+ else
+ {
+ _writeJobQueue.offer(job);
+ }
+ if(_count.getAndIncrement() == 0)
+ {
+ _takeLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _takeLock.unlock();
+ }
+ }
+
+ return true;
+ }
+ finally
+ {
+ putLock.unlock();
+ }
+
+ }
+
+ public Runnable take() throws InterruptedException
+ {
+ final ReentrantLock takeLock = _takeLock;
+ takeLock.lockInterruptibly();
+ try
+ {
+ try
+ {
+ while (_count.get() == 0)
+ {
+ _notEmpty.await();
+ }
+ }
+ catch (InterruptedException ie)
+ {
+ _notEmpty.signal();
+ throw ie;
+ }
+
+ ReadWriteRunnable job = _writeJobQueue.poll();
+ if(job == null)
+ {
+ job = _readJobQueue.poll();
+ }
+ int c = _count.getAndDecrement();
+ if (c > 1)
+ {
+ _notEmpty.signal();
+ }
+ return job;
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+
+
+ }
+
+ public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException
+ {
+ final ReentrantLock takeLock = _takeLock;
+ final AtomicInteger count = _count;
+ long nanos = unit.toNanos(timeout);
+ takeLock.lockInterruptibly();
+ ReadWriteRunnable job = null;
+ try
+ {
+
+ for (;;)
+ {
+ if (count.get() > 0)
+ {
+ job = _writeJobQueue.poll();
+ if(job == null)
+ {
+ job = _readJobQueue.poll();
+ }
+ int c = count.getAndDecrement();
+ if (c > 1)
+ {
+ _notEmpty.signal();
+ }
+ break;
+ }
+ if (nanos <= 0)
+ {
+ return null;
+ }
+ try
+ {
+ nanos = _notEmpty.awaitNanos(nanos);
+ }
+ catch (InterruptedException ie)
+ {
+ _notEmpty.signal();
+ throw ie;
+ }
+ }
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+
+ return job;
+ }
+
+ public int remainingCapacity()
+ {
+ return Integer.MAX_VALUE;
+ }
+
+ public int drainTo(final Collection<? super Runnable> c)
+ {
+ int total = 0;
+
+ _putLock.lock();
+ _takeLock.lock();
+ try
+ {
+ ReadWriteRunnable job;
+ while((job = _writeJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _writeJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ while((job = _readJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _readJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ }
+ finally
+ {
+ _takeLock.unlock();
+ _putLock.unlock();
+ }
+ return total;
+ }
+
+ public int drainTo(final Collection<? super Runnable> c, final int maxElements)
+ {
+ int total = 0;
+
+ _putLock.lock();
+ _takeLock.lock();
+ try
+ {
+ ReadWriteRunnable job;
+ while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _writeJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ while(total<=maxElements && (job = _readJobQueue.peek())!= null)
+ {
+ c.add(job);
+ _readJobQueue.poll();
+ _count.decrementAndGet();
+ total++;
+ }
+
+ }
+ finally
+ {
+ _takeLock.unlock();
+ _putLock.unlock();
+ }
+ return total;
+
+ }
+
+ public Runnable poll()
+ {
+ final ReentrantLock takeLock = _takeLock;
+ takeLock.lock();
+ try
+ {
+ if(_count.get() > 0)
+ {
+ ReadWriteRunnable job = _writeJobQueue.poll();
+ if(job == null)
+ {
+ job = _readJobQueue.poll();
+ }
+ _count.decrementAndGet();
+ return job;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+
+ }
+
+ public Runnable peek()
+ {
+ final ReentrantLock takeLock = _takeLock;
+ takeLock.lock();
+ try
+ {
+ ReadWriteRunnable job = _writeJobQueue.peek();
+ if(job == null)
+ {
+ job = _readJobQueue.peek();
+ }
+ return job;
+ }
+ finally
+ {
+ takeLock.unlock();
+ }
+ }
+}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java?rev=669431&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java Thu Jun 19 02:01:59 2008
@@ -0,0 +1,27 @@
+package org.apache.qpid.pool;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface ReadWriteRunnable extends Runnable
+{
+ boolean isRead();
+ boolean isWrite();
+}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java Thu Jun 19 02:01:59 2008
@@ -22,6 +22,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
@@ -84,6 +87,8 @@
/** Holds the number of executor threads to create. */
private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
+ private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
+
/**
* Retrieves the singleton instance of this reference counter.
*
@@ -105,15 +110,28 @@
*
* @return An executor service.
*/
- ExecutorService acquireExecutorService()
+ public ExecutorService acquireExecutorService()
{
synchronized (_lock)
{
if (_refCount++ == 0)
{
- _pool = Executors.newFixedThreadPool(_poolSize);
+// _pool = Executors.newFixedThreadPool(_poolSize);
+
+ // Use a job queue that biases to writes
+ if(_useBiasedPool)
+ {
+ _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new ReadWriteJobQueue());
+ }
+ else
+ {
+ _pool = Executors.newFixedThreadPool(_poolSize);
+ }
}
+
return _pool;
}
}
@@ -122,7 +140,7 @@
* Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
* to zero, the executor service is shut down.
*/
- void releaseExecutorService()
+ public void releaseExecutorService()
{
synchronized (_lock)
{
Modified: incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java (original)
+++ incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java Thu Jun 19 02:01:59 2008
@@ -23,10 +23,11 @@
import junit.framework.TestCase;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.exchange.DestNameExchange;
+import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueMBean;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -62,7 +63,8 @@
{
// If this test fails due to changes in the broker code,
// then the constants in the Constants.java shoule be updated accordingly
- AMQQueue queue = new AMQQueue(new AMQShortString("testQueueForManagement"), false, null, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueueForManagement"), false, null, false, _virtualHost,
+ null);
AMQManagedObject mbean = new AMQQueueMBean(queue);
MBeanInfo mbeanInfo = mbean.getMBeanInfo();
@@ -82,7 +84,7 @@
{
// If this test fails due to changes in the broker code,
// then the constants in the Constants.java shoule be updated accordingly
- DestNameExchange exchange = new DestNameExchange();
+ DirectExchange exchange = new DirectExchange();
exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
AMQManagedObject mbean = (AMQManagedObject)exchange.getManagedObject();
MBeanInfo mbeanInfo = mbean.getMBeanInfo();
Modified: incubator/qpid/trunk/qpid/java/systests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/pom.xml?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/systests/pom.xml Thu Jun 19 02:01:59 2008
@@ -151,7 +151,6 @@
<MessageReturnTest>-n MessageReturnTest org.apache.qpid.server.queue.MessageReturnTest </MessageReturnTest>
<QueueDepthWithSelectorTest>-n QueueDepthWithSelectorTest org.apache.qpid.server.queue.QueueDepthWithSelectorTest </QueueDepthWithSelectorTest>
<!--<SubscriptionManagerTest>-n SubscriptionManagerTest org.apache.qpid.server.queue.SubscriptionManagerTest </SubscriptionManagerTest>-->
- <SubscriptionSetTest>-n SubscriptionSetTest org.apache.qpid.server.queue.SubscriptionSetTest </SubscriptionSetTest>
<TimeToLiveTest>-n TimeToLiveTest org.apache.qpid.server.queue.TimeToLiveTest </TimeToLiveTest>
<TxnBufferTest>-n TxnBufferTest org.apache.qpid.server.txn.TxnBufferTest </TxnBufferTest>
<!--<TxnTest>-n TxnTest org.apache.qpid.server.txn.TxnTest </TxnTest>-->
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Thu Jun 19 02:01:59 2008
@@ -26,12 +26,16 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -99,12 +103,16 @@
private final List<Long> _unacked;
private StoreContext _storeContext = new StoreContext();
- Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+ Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
{
- TransactionalContext txnContext = new NonTransactionalContext(new MemoryMessageStore(),
+ TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(),
_storeContext, null,
new LinkedList<RequiredDeliveryException>()
);
+ AMQQueue queue =
+ AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("", new MemoryMessageStore()),
+ null);
+
for (int i = 0; i < messageCount; i++)
{
long deliveryTag = i + 1;
@@ -138,8 +146,8 @@
}
};
- TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
- _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag, _map));
+ TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+ _map.add(deliveryTag, queue.enqueue(new StoreContext(), message));
}
_acked = acked;
_unacked = unacked;
@@ -154,7 +162,7 @@
{
for (long tag : tags)
{
- UnacknowledgedMessage u = _map.get(tag);
+ QueueEntry u = _map.get(tag);
assertTrue("Message not found for tag " + tag, u != null);
((TestMessage) u.getMessage()).assertCountEquals(expected);
}
@@ -175,7 +183,7 @@
_op.consolidate();
_op.undoPrepare();
- assertCount(_acked, 1); //DTX Changed to 0, but that is wrong msg 5 is acked!
+ assertCount(_acked, 1);
assertCount(_unacked, 0);
}
@@ -195,34 +203,50 @@
}
}
+ private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
+ {
+ final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
+ null,
+ false);
+ try
+ {
+ amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
+ publishBody,
+ new ContentHeaderBody()
+ {
+ public int getSize()
+ {
+ return 1;
+ }
+ });
+ }
+ catch (AMQException e)
+ {
+ // won't happen
+ }
+
+
+ return amqMessageHandle;
+ }
+
+
private class TestMessage extends AMQMessage
{
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext)
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+ throws AMQException
{
- super(messageId, publishBody, txnContext);
- try
- {
- setContentHeaderBody(new ContentHeaderBody()
- {
- public int getSize()
- {
- return 1;
- }
- });
- }
- catch (AMQException e)
- {
- // won't happen
- }
+ super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
_tag = tag;
}
- public void incrementReference()
+
+ public boolean incrementReference()
{
_count++;
+ return true;
}
public void decrementReference(StoreContext context)
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Thu Jun 19 02:01:59 2008
@@ -24,18 +24,17 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.log4j.Logger;
import java.util.*;
@@ -50,7 +49,7 @@
/**
* Not used in this test, just there to stub out the routing calls
*/
- private MessageStore _store = new SkeletonMessageStore();
+ private MessageStore _store = new MemoryMessageStore();
private StoreContext _storeContext = new StoreContext();
@@ -94,7 +93,11 @@
protected void route(Message m) throws AMQException
{
m.route(exchange);
- m.routingComplete(_store, _storeContext, _handleFactory);
+ m.getIncomingMessage().routingComplete(_store, _handleFactory);
+ if(m.getIncomingMessage().allContentReceived())
+ {
+ m.getIncomingMessage().deliverToQueues();
+ }
}
protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
@@ -122,12 +125,12 @@
{
if (expected.contains(q))
{
- assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
+ assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m));
//assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
}
else
{
- assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
+ assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m));
//assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
}
}
@@ -234,7 +237,7 @@
return properties;
}
- static class TestQueue extends AMQQueue
+ static class TestQueue extends SimpleAMQQueue
{
final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
@@ -248,13 +251,167 @@
* not invoked. It is unnecessary since for this test we only care to know whether the message was
* sent to the queue; the queue processing logic is not being tested.
* @param msg
- * @param deliverFirst
* @throws AMQException
*/
- public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException
+ @Override
+ public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
+ {
+ messages.add( new HeadersExchangeTest.Message(msg));
+ return new QueueEntry()
+ {
+
+ public AMQQueue getQueue()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public AMQMessage getMessage()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getSize()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean getDeliveredToConsumer()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean expired() throws AMQException
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isAcquired()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean acquire()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean acquire(Subscription sub)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean delete()
+ {
+ return false;
+ }
+
+ public boolean isDeleted()
+ {
+ return false;
+ }
+
+ public boolean acquiredBySubscription()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setDeliveredToSubscription()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void release()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public String debugIdentity()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean immediateAndNotDelivered()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setRedelivered(boolean b)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Subscription getDeliveredSubscription()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void reject()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void reject(Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isRejectedBy(Subscription subscription)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void requeue(StoreContext storeContext) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeue(final StoreContext storeContext) throws FailedDequeueException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dispose(final StoreContext storeContext) throws MessageCleanupException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void restoreCredit()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isQueueDeleted()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void addStateChangeListener(StateChangeListener listener)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean removeStateChangeListener(StateChangeListener listener)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int compareTo(final QueueEntry o)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+ };
+ }
+
+ boolean isInQueue(Message msg)
{
- messages.add(new HeadersExchangeTest.Message(msg.getMessage()));
+ return messages.contains(msg);
}
+
}
/**
@@ -262,10 +419,44 @@
*/
static class Message extends AMQMessage
{
- private static MessageStore _messageStore = new MemoryMessageStore();
+ private class TestIncomingMessage extends IncomingMessage
+ {
+
+ public TestIncomingMessage(final long messageId,
+ final MessagePublishInfo info,
+ final TransactionalContext txnContext,
+ final AMQProtocolSession publisher)
+ {
+ super(messageId, info, txnContext, publisher);
+ }
+
+
+ public AMQMessage getUnderlyingMessage()
+ {
+ return Message.this;
+ }
+
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ try
+ {
+ return Message.this.getContentHeaderBody();
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private IncomingMessage _incoming;
+
+ private static MessageStore _messageStore = new SkeletonMessageStore();
private static StoreContext _storeContext = new StoreContext();
+
private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
null,
new LinkedList<RequiredDeliveryException>()
@@ -278,12 +469,47 @@
Message(String id, FieldTable headers) throws AMQException
{
- this(getPublishRequest(id), getContentHeader(headers), null);
+ this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+ }
+
+ public IncomingMessage getIncomingMessage()
+ {
+ return _incoming;
+ }
+
+ private Message(long messageId,
+ MessagePublishInfo publish,
+ ContentHeaderBody header,
+ List<ContentBody> bodies) throws AMQException
+ {
+ super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
+
+
+
+ _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
+ _incoming.setContentHeaderBody(header);
+
+
}
- private Message(MessagePublishInfo publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
+ private static AMQMessageHandle createMessageHandle(final long messageId,
+ final MessagePublishInfo publish,
+ final ContentHeaderBody header)
{
- super(_messageStore.getNewMessageId(), publish, _txnContext, header);
+
+ final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
+ _messageStore,
+ true);
+
+ try
+ {
+ amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
+ }
+ catch (AMQException e)
+ {
+
+ }
+ return amqMessageHandle;
}
private Message(AMQMessage msg) throws AMQException
@@ -291,15 +517,13 @@
super(msg);
}
+
+
void route(Exchange exchange) throws AMQException
{
- exchange.route(this);
+ exchange.route(_incoming);
}
- boolean isInQueue(TestQueue queue)
- {
- return queue.messages.contains(this);
- }
public int hashCode()
{
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=669431&r1=669430&r2=669431&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Thu Jun 19 02:01:59 2008
@@ -28,16 +28,15 @@
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
-
/**
* Test class to test MBean operations for AMQMinaProtocolSession.
*/
@@ -56,13 +55,12 @@
// check the channel count is correct
int channelCount = _mbean.channels().size();
assertTrue(channelCount == 1);
- AMQQueue queue = new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()),
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue_" + System.currentTimeMillis()),
false,
new AMQShortString("test"),
true,
- _protocolSession.getVirtualHost());
- AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
-
+ _protocolSession.getVirtualHost(), null);
+ AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore);
channel.setDefaultQueue(queue);
_protocolSession.addChannel(channel);
channelCount = _mbean.channels().size();