You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC
svn commit: r686136 [2/17] - in /incubator/qpid/branches/qpid.0-10/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/bin/ broker/etc/ broker/...
Modified: incubator/qpid/branches/qpid.0-10/java/010ExcludeList
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/010ExcludeList?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/010ExcludeList (original)
+++ incubator/qpid/branches/qpid.0-10/java/010ExcludeList Thu Aug 14 20:40:49 2008
@@ -4,6 +4,7 @@
org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveC2Only
org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth
org.apache.qpid.test.unit.xa.TopicTest#testMigrateDurableSubscriber
+org.apache.qpid.test.unit.ack.AcknowledgeTest#*
// those tests need durable subscribe states to be persisted
org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent
// those tests require broker recovery
@@ -32,6 +33,7 @@
org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverAsQueueBrowserCreated
org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrowser
org.apache.qpid.test.testcases.FailoverTest#*
+org.apache.qpid.test.client.failover.FailoverTest#*
// Those tests are testing 0.8 specific semantics
org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P
org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P
@@ -45,5 +47,19 @@
org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxP2P
org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteNoTxPubSub
org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxPubSub
+org.apache.qpid.test.client.FlowControlTest#*
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testDefaultExchanges
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnresolvedVirtualHostFailure
// the 0.10 c++ broker does not implement forget
-org.apache.qpid.test.unit.xa.FaultTest#testForget
\ No newline at end of file
+org.apache.qpid.test.unit.xa.FaultTest#testForget
+// the 0-10 c++ broker does not implement priority / this test depends on a Java broker extension for queue creation
+org.apache.qpid.server.queue.PriorityTest
+//this test checks explicitly for 0-8 flow control semantics
+org.apache.qpid.test.client.FlowControlTest
+// 0-10 c++ broker doesn't implement virtual hosts, or those wackhy exchanges
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnresolvedVirtualHostFailure
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testDefaultExchanges
+// 0-10 c++ broker in cpp.testprofile is started with no auth so won't pass this test
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection
+// c++ broker doesn't do selectors, so this will fail
+org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesDoNotFillQueue
Modified: incubator/qpid/branches/qpid.0-10/java/010ExcludeList-store
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/010ExcludeList-store?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/010ExcludeList-store (original)
+++ incubator/qpid/branches/qpid.0-10/java/010ExcludeList-store Thu Aug 14 20:40:49 2008
@@ -5,6 +5,7 @@
org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth
org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash
org.apache.qpid.test.unit.xa.TopicTest#testMigrateDurableSubscriber
+org.apache.qpid.test.unit.ack.AcknowledgeTest#*
// those tests need durable subscribe states to be persisted
org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent
org.apache.qpid.test.unit.ct.DurableSubscriberTest#testDurSubRestoresMessageSelector
@@ -27,6 +28,7 @@
org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverAsQueueBrowserCreated
org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrowser
org.apache.qpid.test.testcases.FailoverTest#*
+org.apache.qpid.test.client.failover.FailoverTest#*
// Those tests are testing 0.8 specific semantics
org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P
org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P
@@ -40,5 +42,16 @@
org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxP2P
org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteNoTxPubSub
org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxPubSub
+org.apache.qpid.test.client.FlowControlTest#*
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testDefaultExchanges
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnresolvedVirtualHostFailure
// the 0.10 c++ broker does not implement forget
-org.apache.qpid.test.unit.xa.FaultTest#testForget
\ No newline at end of file
+org.apache.qpid.test.unit.xa.FaultTest#testForget
+// the 0-10 c++ broker does not implement priority / this test depends on a Java broker extension for queue creation
+org.apache.qpid.server.queue.PriorityTest
+//this test checks explicitly for 0-8 flow control semantics
+org.apache.qpid.test.client.FlowControlTest
+// The default cpp.testprofile does not start the cpp broker with authentication so this test will fail.
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection
+// c++ broker doesn't do selectors, so this will fail
+org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesDoNotFillQueue
Modified: incubator/qpid/branches/qpid.0-10/java/08ExcludeList
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/08ExcludeList?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/08ExcludeList (original)
+++ incubator/qpid/branches/qpid.0-10/java/08ExcludeList Thu Aug 14 20:40:49 2008
@@ -2,7 +2,8 @@
org.apache.qpid.test.unit.xa.TopicTest#*
org.apache.qpid.test.unit.xa.FaultTest#*
org.apache.qpid.test.unit.ct.DurableSubscriberTests#*
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurableWithInvalidSelector
// Those tests are not finished
org.apache.qpid.test.testcases.TTLTest#*
org.apache.qpid.test.testcases.FailoverTest#*
+// This is a long running test so should exclude from normal runs
+org.apache.qpid.test.client.failover.FailoverTest#test4MinuteFailover
Modified: incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Thu Aug 14 20:40:49 2008
@@ -22,6 +22,8 @@
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
import javax.management.JMException;
import javax.management.openmbean.OpenDataException;
@@ -34,7 +36,7 @@
import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.junit.extensions.util.SizeOf;
@@ -191,7 +193,7 @@
return false;
}
- public void route(AMQMessage payload) throws AMQException
+ public void route(IncomingMessage payload) throws AMQException
{
Long value = new Long(SizeOf.getUsedMemory());
@@ -201,17 +203,14 @@
headers.put(key, value);
((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
-
- payload.enqueue(q);
+
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(q);
+ payload.enqueue(queues);
}
- @Override
- public Map<AMQShortString, List<AMQQueue>> getBindings() {
- // TODO Auto-generated method stub
- return null;
- }
-
+
public boolean isBound(AMQShortString routingKey, FieldTable arguments,
AMQQueue queue) {
// TODO Auto-generated method stub
Modified: incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java Thu Aug 14 20:40:49 2008
@@ -1,24 +1,3 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
package org.apache.qpid.extras.exchanges.example;
import java.util.List;
@@ -28,7 +7,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -102,7 +81,7 @@
{
}
- public void route(AMQMessage message) throws AMQException
+ public void route(IncomingMessage message) throws AMQException
{
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-passwd
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-passwd?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-passwd (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-passwd Thu Aug 14 20:40:49 2008
@@ -18,6 +18,11 @@
# under the License.
#
+if [ -z "$QPID_HOME" ]; then
+ export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
+ export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
# Set classpath to include Qpid jar with all required jars in manifest
QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar
Modified: incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server Thu Aug 14 20:40:49 2008
@@ -18,6 +18,11 @@
# under the License.
#
+if [ -z "$QPID_HOME" ]; then
+ export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
+ export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
# Set classpath to include Qpid jar with all required jars in manifest
QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar
@@ -26,6 +31,7 @@
JAVA_VM=-server \
JAVA_MEM=-Xmx1024m \
JAVA_GC="-XX:+UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError" \
- QPID_CLASSPATH=$QPID_LIBS
+ QPID_CLASSPATH=$QPID_LIBS \
+ QPID_RUN_LOG=2
. qpid-run org.apache.qpid.server.Main "$@"
Modified: incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server.bat
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server.bat?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server.bat (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server.bat Thu Aug 14 20:40:49 2008
@@ -17,7 +17,7 @@
@REM under the License.
@REM
-echo off
+@echo off
REM Script to run the Qpid Java Broker
rem Guess QPID_HOME if not defined
@@ -36,35 +36,168 @@
goto end
:okHome
+REM set QPID_WORK if not set
+if not "%QPID_WORK%" == "" goto okQpidWork
+if "%HOME%" == "" goto noHome
+set QPID_WOKR=%HOME%
+goto okQpidWork
+
+:noHome
+set QPID_WORK=c:\Temp
+if not exist %QPID_WORK% md %QPID_WORK%
+:okQpidWork
+
if not "%JAVA_HOME%" == "" goto gotJavaHome
echo The JAVA_HOME environment variable is not defined
echo This environment variable is needed to run this program
-goto exit
+goto end
:gotJavaHome
if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
goto okJavaHome
:noJavaHome
echo The JAVA_HOME environment variable is not defined correctly
echo This environment variable is needed to run this program.
-goto exit
+goto end
:okJavaHome
-rem Slurp the command line arguments. This loop allows for an unlimited number
-rem of agruments (up to the command line limit, anyway).
-set QPID_ARGS=%1
-if ""%1""=="""" goto runCommand
-shift
-:loop
-if ""%1""=="""" goto runCommand
-set QPID_ARGS=%QPID_ARGS% %1
+REM set loggin level if not set
+if "%AMQJ_LOGGING_LEVEL%" == "" set AMQJ_LOGGING_LEVEL=info
+
+REM Set the default system properties that we'll use now that they have
+REM all been initialised
+set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% -DQPID_HOME=%QPID_HOME% -DQPID_WORK=%QPID_WORK%
+
+if "%EXTERNAL_CLASSPATH%" == "" set EXTERNAL_CLASSPATH=%CLASSPATH%
+
+REM Use QPID_CLASSPATH if set
+if "%QPID_CLASSPATH%" == "" goto noQpidClasspath
+set CLASSPATH=%QPID_CLASSPATH%
+echo Using CLASSPATH: %CLASSPATH%
+goto afterQpidClasspath
+
+:noQpidClasspath
+echo Warning: Qpid classpath not set. CLASSPATH set to %QPID_HOME%\lib\qpid-incubating.jar
+set CLASSPATH=%QPID_HOME%\lib\qpid-incubating.jar
+:afterQpidClasspath
+
+REM start parsing -run arguments
+set QPID_ARGS=
+if "%1" == "" goto endRunArgs
+:runLoop
+set var=%1
+if "%var:~0,5%" == "-run:" goto runFound
+set QPID_ARGS=%QPID_ARGS% %1
+:beforeRunShift
shift
-goto loop
+if not "%1"=="" goto runLoop
+goto endRunArgs
+
+:runFound
+if "%var%" == "-run:debug" goto runDebug
+if "%var%" == "-run:jpda" goto runJdpa
+if "%var:~0,24%" == "-run:external-classpath-" goto runExternalClasspath
+if "%var%" == "-run:print-classpath" goto runPrintCP
+if "%var%" == "-run:help" goto runHelp
+echo "unrecognized -run option '%var%'. For using external classpaths use -run:external-classpath-option"
+goto end
+
+:runDebug
+REM USAGE: print the classpath and command before running it
+set debug=true
+goto beforeRunShift
+
+:runJdpa
+REM USAGE: adds debugging options to the java command, use
+REM USAGE: JDPA_TRANSPORT and JPDA_ADDRESS to customize the debugging
+REM USAGE: behavior and use JPDA_OPTS to override it entirely
+if "%JPDA_OPTS%" == "" goto beforeRunShift
+if "%JPDA_TRANSPORT%" == "" set JPDA_TRANSPORT=-dt_socket
+if "%JPDA_ADDRESS%" == "" set JPDA_ADDRESS=8000
+set JPDA_OPTS="-Xdebug -Xrunjdwp:transport=%JPDA_TRANSPORT%,address=%JPDA_ADDRESS%,server=y,suspend=n"
+set QPID_OPTS="%QPID_OPTS% %JPDA_OPTS%"
+goto beforeRunShift
+
+:runExternalClasspath
+echo Using external classpath %var%
+REM USAGE: Format is -run:external-classpath-first/last/ignore/only as equals special in DOS
+REM USAGE: controls how the CLASSPATH environment variable is used by
+REM USAGE: this script, value can be one of ignore (the default), first,
+REM USAGE: last, and only
+if "%var%" == "-run:external-classpath-ignore" goto beforeRunShift
+if "%var%" == "-run:external-classpath-first" goto extCPFirst
+if "%var%" == "-run:external-classpath-last" goto extCPLast
+if "%var%" == "-run:external-classpath-only" goto extCPOnly
+echo Invalid value provided for external classpath.
+goto end
+
+:extCPFirst
+set CLASSPATH=%EXTERNAL_CLASSPATH%;%CLASSPATH%
+goto beforeRunShift
+
+:extCPLast
+set CLASSPATH=%CLASSPATH%;%EXTERNAL_CLASSPATH%
+goto beforeRunShift
+
+:extCPonly
+set CLASSPATH=%EXTERNAL_CLASSPATH%
+goto beforeRunShift
+
+:runPrintCP
+REM USAGE: print the classpath
+echo %CLASSPATH%
+goto beforeRunShift
+
+:runHelp
+REM USAGE: print this message
+echo -------------------------------------------------------------------------------------------
+echo -run:option where option can be the following.
+echo debug : Prints classpath and command before running it
+echo jpda : Adds remote debugging info using JPDA_OPTS. Use JPDA_TRANSPORT and JPDA_ADDRESS to
+echo customize, JPDA_OPTS to override
+echo external-classpath : Valid values are: ignore, first, last and only.
+echo print-classpath : Prints classpath before running command
+echo help : Prints this message
+echo --------------------------------------------------------------------------------------------
+goto end
+
+REM end parsing -run arguments
+:endRunArgs
+
+set JAVA_VM=-server
+set JAVA_MEM=-Xmx1024m
+set JAVA_GC=-XX:+UseConcMarkSweepGC
+rem removing the following vm arg from JAVA_GC as it is supported on ly in Java 1.6
+rem -XX:+HeapDumpOnOutOfMemoryError"
+
+REM Use QPID_JAVA_GC if set
+if "%QPID_JAVA_GC%" == "" goto noQpidJavaGC
+set JAVA_GC=%QPID_JAVA_GC%
+echo Using QPID_JAVA_GC setting: %QPID_JAVA_GC%
+goto afteQpidJavaGC
+
+:noQPidJavaGC
+echo Info: QPID_JAVA_GC not set. Defaulting to JAVA_GC %JAVA_GC%
+:afterQpidJavaGC
+
+REM Use QPID_JAVA_MEM if set
+if "%QPID_JAVA_MEM%" == "" goto noQpidJavaMem
+set JAVA_MEM=%QPID_JAVA_MEM%
+echo Using QPID_JAVA_MEM setting: %QPID_JAVA_MEM%
+goto afterQpidJavaMem
+
+:noQpidJavaMem
+echo Info: QPID_JAVA_MEM not set. Defaulting to JAVA_MEM %JAVA_MEM%
+:after QpidJavaMem
+
rem QPID_OPTS intended to hold any -D props for use
rem user must enclose any value for QPID_OPTS in double quotes
:runCommand
-set LAUNCH_JAR=%QPID_HOME%\lib\qpid-incubating.jar
set MODULE_JARS=%QPID_MODULE_JARS%
-"%JAVA_HOME%\bin\java" -server -Xmx1024m %QPID_OPTS% -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
+set COMMAND="%JAVA_HOME%\bin\java" %JAVA_VM% %JAVA_MEM% %JAVA_GC% %QPID_OPTS% %SYSTEM_PROPS% -cp "%CLASSPATH%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
+
+if "%debug%" == "true" echo %CLASSPATH%;%LAUNCH_JAR%;%MODULE_JARS%
+if "%debug%" == "true" echo %COMMAND%
+%COMMAND%
:end
Modified: incubator/qpid/branches/qpid.0-10/java/broker/etc/acl.config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/etc/acl.config.xml?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/etc/acl.config.xml (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/etc/acl.config.xml Thu Aug 14 20:40:49 2008
@@ -93,7 +93,7 @@
<queues>
<exchange>amq.direct</exchange>
<!-- 4Mb -->
- <maximumQueueDepth>4235g264</maximumQueueDepth>
+ <maximumQueueDepth>4235264</maximumQueueDepth>
<!-- 2Mb -->
<maximumMessageSize>2117632</maximumMessageSize>
<!-- 10 mins -->
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/grammar/SelectorParser.jj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/grammar/SelectorParser.jj?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/grammar/SelectorParser.jj (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/grammar/SelectorParser.jj Thu Aug 14 20:40:49 2008
@@ -94,7 +94,7 @@
return this.JmsSelector();
}
catch (Throwable e) {
- throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql, e);
+ throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql,e);
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Thu Aug 14 20:40:49 2008
@@ -56,8 +56,9 @@
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.ManagedBroker;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -175,7 +176,8 @@
ownerShortString = new AMQShortString(owner);
}
- queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost());
+ queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost(),
+ null);
if (queue.isDurable() && !queue.isAutoDelete())
{
_messageStore.createQueue(queue);
@@ -220,7 +222,7 @@
try
{
queue.delete();
- _messageStore.removeQueue(new AMQShortString(queueName));
+ _messageStore.removeQueue(queue);
}
catch (AMQException ex)
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Aug 14 20:40:49 2008
@@ -21,7 +21,6 @@
package org.apache.qpid.server;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
@@ -30,14 +29,23 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.NoRouteException;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
@@ -45,13 +53,13 @@
import org.apache.qpid.server.txn.TransactionalContext;
import java.util.Collection;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class AMQChannel
{
@@ -61,13 +69,8 @@
private final int _channelId;
- // private boolean _transactional;
-
- private long _prefetch_HighWaterMark;
- private long _prefetch_LowWaterMark;
-
- private long _prefetchSize;
+ private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l);
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -86,10 +89,11 @@
* been received by this channel. As the frames are received the message gets updated and once all frames have been
* received the message can then be routed.
*/
- private AMQMessage _currentMessage;
+ private IncomingMessage _currentMessage;
+
+ /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
+ private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
- /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private final MessageStore _messageStore;
@@ -97,7 +101,7 @@
private final AtomicBoolean _suspended = new AtomicBoolean(false);
- private TransactionalContext _txnContext, _nonTransactedContext;
+ private TransactionalContext _txnContext;
/**
* A context used by the message store enabling it to track context for a given channel even across thread
@@ -109,8 +113,6 @@
private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
- private Set<Long> _browsedAcks = new HashSet<Long>();
-
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
private boolean _closing;
@@ -118,7 +120,7 @@
@Configured(path = "advanced.enableJMSXUserID",
defaultValue = "false")
public boolean ENABLE_JMSXUserID;
-
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
@@ -129,8 +131,8 @@
_session = session;
_channelId = channelId;
_storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
- _prefetch_HighWaterMark = DEFAULT_PREFETCH;
- _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
+
+
_messageStore = messageStore;
// by default the session is non-transactional
@@ -140,7 +142,7 @@
/** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
- _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
+ _txnContext = new LocalTransactionalContext(this);
}
public boolean isTransactional()
@@ -156,55 +158,15 @@
return _channelId;
}
- public long getPrefetchCount()
- {
- return _prefetch_HighWaterMark;
- }
-
- public void setPrefetchCount(long prefetchCount)
- {
- _prefetch_HighWaterMark = prefetchCount;
- }
-
- public long getPrefetchSize()
- {
- return _prefetchSize;
- }
-
- public void setPrefetchSize(long prefetchSize)
- {
- _prefetchSize = prefetchSize;
- }
-
- public long getPrefetchLowMarkCount()
- {
- return _prefetch_LowWaterMark;
- }
-
- public void setPrefetchLowMarkCount(long prefetchCount)
- {
- _prefetch_LowWaterMark = prefetchCount;
- }
-
- public long getPrefetchHighMarkCount()
- {
- return _prefetch_HighWaterMark;
- }
-
- public void setPrefetchHighMarkCount(long prefetchCount)
- {
- _prefetch_HighWaterMark = prefetchCount;
- }
-
- public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher, final Exchange e) throws AMQException
+ public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException
{
- _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext);
- _currentMessage.setPublisher(publisher);
+ _currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _txnContext, _session);
+ _currentMessage.setMessageStore(_messageStore);
_currentMessage.setExchange(e);
}
- public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
+ public void publishContentHeader(ContentHeaderBody contentHeaderBody)
throws AMQException
{
if (_currentMessage == null)
@@ -215,7 +177,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Content header received on channel " + _channelId);
+ _log.debug("Content header received on channel " + _channelId);
}
if (ENABLE_JMSXUserID)
@@ -225,25 +187,48 @@
//fixme: fudge for QPID-677
properties.getHeaders().keySet();
- properties.setUserId(protocolSession.getAuthorizedID().getName());
+ properties.setUserId(_session.getAuthorizedID().getName());
}
_currentMessage.setContentHeaderBody(contentHeaderBody);
+
_currentMessage.setExpiration();
routeCurrentMessage();
- _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
- // check and deliver if header says body length is zero
- if (contentHeaderBody.bodySize == 0)
+ _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
+
+ deliverCurrentMessageIfComplete();
+
+ }
+ }
+
+ private void deliverCurrentMessageIfComplete()
+ throws AMQException
+ {
+ // check and deliver if header says body length is zero
+ if (_currentMessage.allContentReceived())
+ {
+ try
{
- _txnContext.messageProcessed(protocolSession);
+ _currentMessage.deliverToQueues();
+ }
+ catch (NoRouteException e)
+ {
+ _returnMessages.add(e);
+ }
+ finally
+ {
+ // callback to allow the context to do any post message processing
+ // primary use is to allow message return processing in the non-tx case
+ _txnContext.messageProcessed(_session);
_currentMessage = null;
}
}
+
}
- public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) throws AMQException
+ public void publishContentBody(ContentBody contentBody) throws AMQException
{
if (_currentMessage == null)
{
@@ -260,15 +245,11 @@
// returns true iff the message was delivered (i.e. if all data was
// received
- if (_currentMessage.addContentBodyFrame(_storeContext,
- protocolSession.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
- contentBody)))
- {
- // callback to allow the context to do any post message processing
- // primary use is to allow message return processing in the non-tx case
- _txnContext.messageProcessed(protocolSession);
- _currentMessage = null;
- }
+ _currentMessage.addContentBodyFrame(
+ _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
+ contentBody));
+
+ deliverCurrentMessageIfComplete();
}
catch (AMQException e)
{
@@ -287,6 +268,7 @@
}
catch (NoRouteException e)
{
+ //_currentMessage.incrementReference();
_returnMessages.add(e);
}
}
@@ -307,18 +289,17 @@
*
* @param tag the tag chosen by the client (if null, server will generate one)
* @param queue the queue to subscribe to
- * @param session the protocol session of the subscriber
- * @param noLocal Flag stopping own messages being receivied.
- * @param exclusive Flag requesting exclusive access to the queue
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
+ * @param noLocal Flag stopping own messages being receivied.
+ * @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
+ public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
@@ -326,77 +307,65 @@
tag = new AMQShortString("sgen_" + getNextConsumerTag());
}
- if (_consumerTag2QueueMap.containsKey(tag))
+ if (_tag2SubscriptionMap.containsKey(tag))
{
throw new ConsumerTagNotUniqueException();
}
+ Subscription subscription =
+ SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
+
+
+ // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
// We add before we register as the Async Delivery process may AutoClose the subscriber
// so calling _cT2QM.remove before we have done put which was after the register succeeded.
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
- _consumerTag2QueueMap.put(tag, queue);
+
+ _tag2SubscriptionMap.put(tag, subscription);
try
{
- queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
+ queue.registerSubscription(subscription, exclusive);
}
catch (AMQException e)
{
- _consumerTag2QueueMap.remove(tag);
+ _tag2SubscriptionMap.remove(tag);
throw e;
}
-
return tag;
}
/**
* Unsubscribe a consumer from a queue.
- * @param session
* @param consumerTag
* @return true if the consumerTag had a mapped queue that could be unregistered.
* @throws AMQException
*/
- public boolean unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
+ public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
- _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
-
- public boolean callback(UnacknowledgedMessage message) throws AMQException
- {
- _log.debug(message);
-
- return true;
- }
- public void visitComplete()
- {
- }
- });
- }
-
- AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
- if (q != null)
+ Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
+ if (sub != null)
{
- q.unregisterProtocolSession(session, _channelId, consumerTag);
+ sub.getQueue().unregisterSubscription(sub);
return true;
}
+ else
+ {
+ _log.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered.");
+ }
return false;
}
/**
* Called from the protocol session to close this channel and clean up. T
*
- * @param session The session to close
- *
* @throws AMQException if there is an error during closure
*/
- public void close(AMQProtocolSession session) throws AMQException
+ public void close() throws AMQException
{
_txnContext.rollback();
- unsubscribeAllConsumers(session);
+ unsubscribeAllConsumers();
try
{
requeue();
@@ -414,11 +383,11 @@
_closing = closing;
}
- private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
+ private void unsubscribeAllConsumers() throws AMQException
{
if (_log.isInfoEnabled())
{
- if (!_consumerTag2QueueMap.isEmpty())
+ if (!_tag2SubscriptionMap.isEmpty())
{
_log.info("Unsubscribing all consumers on channel " + toString());
}
@@ -428,17 +397,19 @@
}
}
- for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet())
+ for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
{
if (_log.isInfoEnabled())
{
_log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
+ Subscription sub = me.getValue();
+
+ sub.getQueue().unregisterSubscription(sub);
}
- _consumerTag2QueueMap.clear();
+ _tag2SubscriptionMap.clear();
}
/**
@@ -447,9 +418,9 @@
* @param entry the record of the message on the queue that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
* delivery tag)
- * @param consumerTag The tag for the consumer that is to acknowledge this message.
+ * @param subscription The consumer that is to acknowledge this message.
*/
- public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, AMQShortString consumerTag)
+ public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
{
if (_log.isDebugEnabled())
{
@@ -462,16 +433,13 @@
if (_log.isDebugEnabled())
{
_log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
- + ") with a queue(" + entry.getQueue() + ") for " + consumerTag);
+ + ") with a queue(" + entry.getQueue() + ") for " + subscription);
}
}
}
- synchronized (_unacknowledgedMessageMap.getLock())
- {
- _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag,_unacknowledgedMessageMap));
- checkSuspension();
- }
+ _unacknowledgedMessageMap.add(deliveryTag, entry);
+
}
private final String id = "(" + System.identityHashCode(this) + ")";
@@ -490,7 +458,7 @@
public void requeue() throws AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
- Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+ Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
// Deliver these messages out of the transaction as their delivery was never
// part of the transaction only the receive.
@@ -505,13 +473,9 @@
if (!(_txnContext instanceof NonTransactionalContext))
{
- // if (_nonTransactedContext == null)
- {
- _nonTransactedContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
- }
- deliveryContext = _nonTransactedContext;
+ deliveryContext =
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
}
else
{
@@ -519,22 +483,23 @@
}
}
- for (UnacknowledgedMessage unacked : messagesToBeDelivered)
+ for (QueueEntry unacked : messagesToBeDelivered)
{
if (!unacked.isQueueDeleted())
{
- // Ensure message is released for redelivery
- unacked.entry.release();
-
// Mark message redelivered
unacked.getMessage().setRedelivered(true);
+ // Ensure message is released for redelivery
+ unacked.release();
+
// Deliver Message
- deliveryContext.deliver(unacked.entry, false);
+ deliveryContext.requeue(unacked);
- // Should we allow access To the DM to directy deliver the message?
- // As we don't need to check for Consumers or worry about incrementing the message count?
- // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
+ }
+ else
+ {
+ unacked.discard(_storeContext);
}
}
@@ -549,32 +514,29 @@
*/
public void requeue(long deliveryTag) throws AMQException
{
- UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
+ QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag);
if (unacked != null)
{
+ // Mark message redelivered
+ unacked.getMessage().setRedelivered(true);
// Ensure message is released for redelivery
if (!unacked.isQueueDeleted())
{
- unacked.entry.release();
+ unacked.release();
}
- // Mark message redelivered
- unacked.getMessage().setRedelivered(true);
// Deliver these messages out of the transaction as their delivery was never
// part of the transaction only the receive.
TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- // if (_nonTransactedContext == null)
- {
- _nonTransactedContext =
+
+ deliveryContext =
new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
- }
- deliveryContext = _nonTransactedContext;
}
else
{
@@ -584,7 +546,7 @@
if (!unacked.isQueueDeleted())
{
// Redeliver the messages to the front of the queue
- deliveryContext.deliver(unacked.entry, true);
+ deliveryContext.requeue(unacked);
// Deliver increments the message count but we have already deliverted this once so don't increment it again
// this was because deliver did an increment changed this.
}
@@ -592,11 +554,8 @@
{
_log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
- // _log.error("Requested requeue of message:" + deliveryTag +
- // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
- //
- // deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
- //
+
+ unacked.discard(_storeContext);
}
}
else
@@ -604,25 +563,6 @@
_log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
+ _unacknowledgedMessageMap.size());
- if (_log.isDebugEnabled())
- {
- _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
- int count = 0;
-
- public boolean callback(UnacknowledgedMessage message) throws AMQException
- {
- _log.debug(
- (count++) + ": (" + message.getMessage().debugIdentity() + ")" + "[" + message.deliveryTag + "]");
-
- return false; // Continue
- }
-
- public void visitComplete()
- {
- }
- });
- }
}
}
@@ -636,8 +576,10 @@
*/
public void resend(final boolean requeue) throws AMQException
{
- final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
- final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
+
+
+ final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
if (_log.isDebugEnabled())
{
@@ -647,23 +589,25 @@
// Process the Unacked-Map.
// Marking messages who still have a consumer for to be resent
// and those that don't to be requeued.
+
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
{
- AMQShortString consumerTag = message.consumerTag;
+
AMQMessage msg = message.getMessage();
msg.setRedelivered(true);
- if (consumerTag != null)
+ final Subscription subscription = message.getDeliveredSubscription();
+ if (subscription != null)
{
// Consumer exists
- if (_consumerTag2QueueMap.containsKey(consumerTag))
+ if (!subscription.isClosed())
{
- msgToResend.add(message);
+ msgToResend.put(deliveryTag, message);
}
else // consumer has gone
{
- msgToRequeue.add(message);
+ msgToRequeue.put(deliveryTag, message);
}
}
else
@@ -675,7 +619,7 @@
{
if (requeue)
{
- msgToRequeue.add(message);
+ msgToRequeue.put(deliveryTag, message);
}
else
{
@@ -684,7 +628,8 @@
}
else
{
- _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
+ message.discard(_storeContext);
+ _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
}
}
@@ -697,6 +642,8 @@
}
});
+ _unacknowledgedMessageMap.clear();
+
// Process Messages to Resend
if (_log.isDebugEnabled())
{
@@ -710,9 +657,15 @@
}
}
- for (UnacknowledgedMessage message : msgToResend)
+ for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
{
+ QueueEntry message = entry.getValue();
+ long deliveryTag = entry.getKey();
+
+
+
AMQMessage msg = message.getMessage();
+ AMQQueue queue = message.getQueue();
// Our Java Client will always suspend the channel when resending!
// If the client has requested the messages be resent then it is
@@ -727,46 +680,20 @@
// else
// {
// release to allow it to be delivered
- message.entry.release();
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
msg.setRedelivered(true);
- Subscription sub = message.entry.getDeliveredSubscription();
+ Subscription sub = message.getDeliveredSubscription();
if (sub != null)
{
- // Get the lock so we can tell if the sub scription has closed.
- // will stop delivery to this subscription until the lock is released.
- // note: this approach would allow the use of a single queue if the
- // PreDeliveryQueue would allow head additions.
- // In the Java Qpid client we are suspended whilst doing this so it is all rather Mute..
- // needs guidance from AMQP WG Model SIG
- synchronized (sub.getSendLock())
+
+ if(!queue.resend(message, sub))
{
- if (sub.isClosed())
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Subscription(" + System.identityHashCode(sub)
- + ") closed during resend so requeuing message");
- }
- // move this message to requeue
- msgToRequeue.add(message);
- }
- else
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:"
- + System.identityHashCode(sub));
- }
-
- sub.addToResendQueue(message.entry);
- _unacknowledgedMessageMap.remove(message.deliveryTag);
- }
- } // sync(sub.getSendLock)
+ msgToRequeue.put(deliveryTag, message);
+ }
}
else
{
@@ -777,7 +704,7 @@
+ ")to prevent loss");
}
// move this message to requeue
- msgToRequeue.add(message);
+ msgToRequeue.put(deliveryTag, message);
}
} // for all messages
// } else !isSuspend
@@ -795,13 +722,9 @@
TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- if (_nonTransactedContext == null)
- {
- _nonTransactedContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
- }
- deliveryContext = _nonTransactedContext;
+ deliveryContext =
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
}
else
{
@@ -809,14 +732,17 @@
}
// Process Messages to Requeue at the front of the queue
- for (UnacknowledgedMessage message : msgToRequeue)
+ for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
{
- message.entry.release();
- message.entry.setRedelivered(true);
+ QueueEntry message = entry.getValue();
+ long deliveryTag = entry.getKey();
+
+ message.release();
+ message.setRedelivered(true);
- deliveryContext.deliver(message.entry, true);
+ deliveryContext.requeue(message);
- _unacknowledgedMessageMap.remove(message.deliveryTag);
+ _unacknowledgedMessageMap.remove(deliveryTag);
}
}
@@ -827,38 +753,47 @@
*
* @param queue the queue that has been deleted
*
- * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
*/
- public void queueDeleted(final AMQQueue queue) throws AMQException
+ /* public void queueDeleted(final AMQQueue queue)
{
- _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ try
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- if (message.getQueue() == queue)
+ public boolean callback(UnacknowledgedMessage message)
{
- try
+ if (message.getQueue() == queue)
{
- message.discard(_storeContext);
- message.setQueueDeleted(true);
+ try
+ {
+ message.discard(_storeContext);
+ message.setQueueDeleted(true);
+ }
+ catch (AMQException e)
+ {
+ _log.error(
+ "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
+ throw new RuntimeException(e);
+ }
}
- catch (AMQException e)
- {
- _log.error(
- "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
- }
+
+ return false;
}
- return false;
- }
+ public void visitComplete()
+ {
+ }
+ });
+ }
+ catch (AMQException e)
+ {
+ _log.error("Unexpected Error while handling deletion of queue", e);
+ throw new RuntimeException(e);
+ }
- public void visitComplete()
- {
- }
- });
}
-
+*/
/**
* Acknowledge one or more messages.
*
@@ -870,23 +805,7 @@
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
- synchronized (_unacknowledgedMessageMap.getLock())
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Unacked (PreAck) Size:" + _unacknowledgedMessageMap.size());
- }
-
- _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Unacked (PostAck) Size:" + _unacknowledgedMessageMap.size());
- }
-
- }
-
- checkSuspension();
+ _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
}
/**
@@ -899,43 +818,22 @@
return _unacknowledgedMessageMap;
}
- private void checkSuspension()
- {
- boolean suspend;
-
- suspend =
- ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark))
- || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()));
-
- setSuspended(suspend);
- }
public void setSuspended(boolean suspended)
{
- boolean isSuspended = _suspended.get();
- if (isSuspended && !suspended)
- {
- // Continue being suspended if we are above the _prefetch_LowWaterMark
- suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
- }
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
if (wasSuspended)
{
- _log.debug("Unsuspending channel " + this);
// may need to deliver queued messages
- for (AMQQueue q : _consumerTag2QueueMap.values())
+ for (Subscription s : _tag2SubscriptionMap.values())
{
- q.deliverAsync();
+ s.getQueue().deliverAsync(s);
}
}
- else
- {
- _log.debug("Suspending channel " + this);
- }
}
}
@@ -961,12 +859,7 @@
public String toString()
{
- StringBuilder sb = new StringBuilder(30);
- sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional());
- sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark);
- sb.append("/").append(_prefetch_HighWaterMark);
-
- return sb.toString();
+ return "["+_session.toString()+":"+_channelId+"]";
}
public void setDefaultQueue(AMQQueue queue)
@@ -984,14 +877,14 @@
return _storeContext;
}
- public void processReturns(AMQProtocolSession session) throws AMQException
+ public void processReturns() throws AMQException
{
if (!_returnMessages.isEmpty())
{
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
AMQMessage message = bouncedMessage.getAMQMessage();
- session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+ _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
new AMQShortString(bouncedMessage.getMessage()));
message.decrementReference(_storeContext);
@@ -1001,40 +894,68 @@
}
}
- public boolean wouldSuspend(AMQMessage msg)
+
+ public TransactionalContext getTransactionalContext()
{
- if (isSuspended())
- {
- return true;
- }
- else
- {
- boolean willSuspend =
- ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark));
- if (!willSuspend)
- {
- final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
+ return _txnContext;
+ }
- willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < (msg.getSize() + unackedSize));
- }
+ public boolean isClosing()
+ {
+ return _closing;
+ }
- if (willSuspend)
- {
- setSuspended(true);
- }
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _session;
+ }
- return willSuspend;
- }
+ public FlowCreditManager getCreditManager()
+ {
+ return _creditManager;
+ }
+ public void setCredit(final long prefetchSize, final int prefetchCount)
+ {
+ _creditManager.setCreditLimits(prefetchSize, prefetchCount);
}
- public TransactionalContext getTransactionalContext()
+ public List<RequiredDeliveryException> getReturnMessages()
{
- return _txnContext;
+ return _returnMessages;
}
- public boolean isClosing()
+ public MessageStore getMessageStore()
{
- return _closing;
+ return _messageStore;
+ }
+
+ private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod()
+ {
+
+ public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ throws AMQException
+ {
+ getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag());
+ }
+ };
+
+ public ClientDeliveryMethod getClientDeliveryMethod()
+ {
+ return _clientDeliveryMethod;
+ }
+
+ private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
+ {
+
+ public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ {
+ addUnacknowledgedMessage(entry, deliveryTag, sub);
+ }
+ };
+
+ public RecordDeliveryMethod getRecordDeliveryMethod()
+ {
+ return _recordDeliveryMethod;
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/Main.java Thu Aug 14 20:40:49 2008
@@ -34,7 +34,6 @@
import org.apache.log4j.xml.DOMConfigurator;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.common.FixedSizeByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
@@ -279,6 +278,12 @@
ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
}
+
+ if(connectorConfig.useBiasedWrites)
+ {
+ System.setProperty("org.apache.qpid.use_write_biased_pool","true");
+ }
+
int port = connectorConfig.port;
String portStr = commandLine.getOptionValue("p");
@@ -415,7 +420,8 @@
bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
}
- acceptor.bind(bindAddress, handler, sconfig);
+ bind(acceptor, bindAddress, handler, sconfig);
+
//fixme qpid.AMQP should be using qpidproperties to get value
_brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
}
@@ -426,7 +432,8 @@
try
{
- acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
+ bind(acceptor, new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
+
//fixme qpid.AMQP should be using qpidproperties to get value
_brokerLogger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
@@ -449,6 +456,23 @@
}
}
+ /**
+ * Ensure that any bound Acceptors are recorded in the registry so they can be closed later.
+ *
+ * @param acceptor
+ * @param bindAddress
+ * @param handler
+ * @param sconfig
+ *
+ * @throws IOException from the acceptor.bind command
+ */
+ private void bind(IoAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException
+ {
+ acceptor.bind(bindAddress, handler, sconfig);
+
+ ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor);
+ }
+
public static void main(String[] args)
{
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Thu Aug 14 20:40:49 2008
@@ -39,19 +39,30 @@
*/
public abstract class RequiredDeliveryException extends AMQException
{
- private final AMQMessage _amqMessage;
+ private AMQMessage _amqMessage;
public RequiredDeliveryException(String message, AMQMessage payload)
{
super(message);
+ setMessage(payload);
+ }
+
+
+ public RequiredDeliveryException(String message)
+ {
+ super(message);
+ }
+
+ public void setMessage(final AMQMessage payload)
+ {
+
// Increment the reference as this message is in the routing phase
// and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
// handler. So increment here.
_amqMessage = payload.takeReference();
- // payload.incrementReference();
}
public AMQMessage getAMQMessage()
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Thu Aug 14 20:40:49 2008
@@ -22,11 +22,14 @@
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import java.util.ArrayList;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.queue.QueueEntry;
/**
* A TxnOp implementation for handling accumulated acks
@@ -34,7 +37,7 @@
public class TxAck implements TxnOp
{
private final UnacknowledgedMessageMap _map;
- private final List <UnacknowledgedMessage> _unacked = new ArrayList<UnacknowledgedMessage>();
+ private final Map<Long, QueueEntry> _unacked = new HashMap<Long,QueueEntry>();
private List<Long> _individual;
private long _deliveryTag;
private boolean _multiple;
@@ -46,11 +49,12 @@
public void update(long deliveryTag, boolean multiple)
{
+ _unacked.clear();
if (!multiple)
{
if(_individual == null)
{
- _individual = new ArrayList<Long>();
+ _individual = new ArrayList<Long>();
}
//have acked a single message that is not part of
//the previously acked region so record
@@ -64,36 +68,29 @@
_deliveryTag = deliveryTag;
_multiple = true;
}
- _unacked.clear();
}
public void consolidate()
{
if(_unacked.isEmpty())
{
- consolidate(_unacked);
- }
-
- }
-
- private void consolidate(List<UnacknowledgedMessage> unacked)
- {
- //lookup all the unacked messages that have been acked in this transaction
- if (_multiple)
- {
- //get all the unacked messages for the accumulated
- //multiple acks
- _map.collect(_deliveryTag, true, unacked);
- }
- //get any unacked messages for individual acks outside the
- //range covered by multiple acks
- if(_individual != null)
- {
- for (Long tag : _individual)
+ //lookup all the unacked messages that have been acked in this transaction
+ if (_multiple)
{
- if(_deliveryTag < tag)
+ //get all the unacked messages for the accumulated
+ //multiple acks
+ _map.collect(_deliveryTag, true, _unacked);
+ }
+ if(_individual != null)
+ {
+ //get any unacked messages for individual acks outside the
+ //range covered by multiple acks
+ for (long tag : _individual)
{
- _map.collect(tag, false, unacked);
+ if(_deliveryTag < tag)
+ {
+ _map.collect(tag, false, _unacked);
+ }
}
}
}
@@ -101,12 +98,10 @@
public boolean checkPersistent() throws AMQException
{
-
-
consolidate();
//if any of the messages in unacked are persistent the txn
//buffer must be marked as persistent:
- for (UnacknowledgedMessage msg : _unacked)
+ for (QueueEntry msg : _unacked.values())
{
if (msg.getMessage().isPersistent())
{
@@ -119,7 +114,7 @@
public void prepare(StoreContext storeContext) throws AMQException
{
//make persistent changes, i.e. dequeue and decrementReference
- for (UnacknowledgedMessage msg : _unacked)
+ for (QueueEntry msg : _unacked.values())
{
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(storeContext);
@@ -133,7 +128,7 @@
//in memory counter) so if we failed in prepare for full
//txn, this op will have to compensate by fixing the count
//in memory (persistent changes will be rolled back by store)
- for (UnacknowledgedMessage msg : _unacked)
+ for (QueueEntry msg : _unacked.values())
{
msg.getMessage().takeReference();
}
@@ -142,7 +137,7 @@
public void commit(StoreContext storeContext)
{
//remove the unacked messages from the channels map
- _map.remove(_unacked);
+ _map.remove(_unacked);
}
public void rollback(StoreContext storeContext)
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Thu Aug 14 20:40:49 2008
@@ -23,41 +23,42 @@
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.StoreContext;
public interface UnacknowledgedMessageMap
{
public interface Visitor
{
/**
- * @param message the message being iterated over
- * @return true to stop iteration, false to continue
+ * @param deliveryTag
+ *@param message the message being iterated over @return true to stop iteration, false to continue
* @throws AMQException
*/
- boolean callback(UnacknowledgedMessage message) throws AMQException;
+ boolean callback(final long deliveryTag, QueueEntry message) throws AMQException;
void visitComplete();
}
void visit(Visitor visitor) throws AMQException;
- Object getLock();
+ void add(long deliveryTag, QueueEntry message);
- void add(long deliveryTag, UnacknowledgedMessage message);
-
- void collect(Long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
+ void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs);
boolean contains(long deliveryTag) throws AMQException;
- void remove(List<UnacknowledgedMessage> msgs);
-
- UnacknowledgedMessage remove(long deliveryTag);
+ void remove(Map<Long,QueueEntry> msgs);
- void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException;
+ QueueEntry remove(long deliveryTag);
- Collection<UnacknowledgedMessage> cancelAllMessages();
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException;
+
+ Collection<QueueEntry> cancelAllMessages();
void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException;
@@ -65,7 +66,7 @@
void clear();
- UnacknowledgedMessage get(long deliveryTag);
+ QueueEntry get(long deliveryTag);
/**
* Get the set of delivery tags that are outstanding.
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Thu Aug 14 20:40:49 2008
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.ack;
+import org.apache.qpid.server.store.StoreContext;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -28,6 +29,10 @@
import java.util.Set;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.txn.TransactionalContext;
public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
@@ -36,7 +41,7 @@
private long _unackedSize;
- private Map<Long, UnacknowledgedMessage> _map;
+ private Map<Long, QueueEntry> _map;
private long _lastDeliveryTag;
@@ -45,10 +50,10 @@
public UnacknowledgedMessageMapImpl(int prefetchLimit)
{
_prefetchLimit = prefetchLimit;
- _map = new LinkedHashMap<Long, UnacknowledgedMessage>(prefetchLimit);
+ _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
}
- public void collect(Long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
+ public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
{
if (multiple)
{
@@ -56,7 +61,7 @@
}
else
{
- msgs.add(get(deliveryTag));
+ msgs.put(deliveryTag, get(deliveryTag));
}
}
@@ -69,26 +74,27 @@
}
}
- public void remove(List<UnacknowledgedMessage> msgs)
+ public void remove(Map<Long,QueueEntry> msgs)
{
synchronized (_lock)
{
- for (UnacknowledgedMessage msg : msgs)
+ for (Long deliveryTag : msgs.keySet())
{
- remove(msg.deliveryTag);
+ remove(deliveryTag);
}
}
}
- public UnacknowledgedMessage remove(long deliveryTag)
+ public QueueEntry remove(long deliveryTag)
{
synchronized (_lock)
{
- UnacknowledgedMessage message = _map.remove(deliveryTag);
+ QueueEntry message = _map.remove(deliveryTag);
if(message != null)
{
_unackedSize -= message.getMessage().getSize();
+
}
return message;
@@ -99,21 +105,16 @@
{
synchronized (_lock)
{
- Collection<UnacknowledgedMessage> currentEntries = _map.values();
- for (UnacknowledgedMessage msg : currentEntries)
+ Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
+ for (Map.Entry<Long, QueueEntry> entry : currentEntries)
{
- visitor.callback(msg);
+ visitor.callback(entry.getKey().longValue(), entry.getValue());
}
visitor.visitComplete();
}
}
- public Object getLock()
- {
- return _lock;
- }
-
- public void add(long deliveryTag, UnacknowledgedMessage message)
+ public void add(long deliveryTag, QueueEntry message)
{
synchronized (_lock)
{
@@ -123,12 +124,12 @@
}
}
- public Collection<UnacknowledgedMessage> cancelAllMessages()
+ public Collection<QueueEntry> cancelAllMessages()
{
synchronized (_lock)
{
- Collection<UnacknowledgedMessage> currentEntries = _map.values();
- _map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit);
+ Collection<QueueEntry> currentEntries = _map.values();
+ _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
_unackedSize = 0l;
return currentEntries;
}
@@ -160,14 +161,15 @@
}
}
- public void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException
+
{
synchronized (_lock)
{
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = _map.entrySet().iterator();
+ Iterator<Map.Entry<Long, QueueEntry>> it = _map.entrySet().iterator();
while (it.hasNext())
{
- Map.Entry<Long, UnacknowledgedMessage> unacked = it.next();
+ Map.Entry<Long, QueueEntry> unacked = it.next();
if (unacked.getKey() > deliveryTag)
{
@@ -176,10 +178,14 @@
" When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
}
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ unacked.getValue().discard(storeContext);
+
it.remove();
+
_unackedSize -= unacked.getValue().getMessage().getSize();
- destination.add(unacked.getValue());
+
if (unacked.getKey() == deliveryTag)
{
break;
@@ -188,7 +194,7 @@
}
}
- public UnacknowledgedMessage get(long key)
+ public QueueEntry get(long key)
{
synchronized (_lock)
{
@@ -204,14 +210,14 @@
}
}
- private void collect(Long key, List<UnacknowledgedMessage> msgs)
+ private void collect(long key, Map<Long, QueueEntry> msgs)
{
synchronized (_lock)
{
- for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+ for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
{
- msgs.add(entry.getValue());
- if (entry.getKey().equals(key))
+ msgs.put(entry.getKey(),entry.getValue());
+ if (entry.getKey() == key)
{
break;
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Thu Aug 14 20:40:49 2008
@@ -30,11 +30,13 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -176,11 +178,22 @@
boolean durable = queueConfiguration.getBoolean("durable" ,false);
boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
String owner = queueConfiguration.getString("owner", null);
+ FieldTable arguments = null;
+ Integer priorities = queueConfiguration.getInteger("priorities", null);
+ if(priorities != null && priorities.intValue() > 1)
+ {
+ if(arguments == null)
+ {
+ arguments = new FieldTable();
+ }
+ arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
+ }
+
- queue = new AMQQueue(queueName,
+ queue = AMQQueueFactory.createAMQQueueImpl(queueName,
durable,
owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */,
- autodelete /* Therefore autodelete makes no sence */, virtualHost);
+ autodelete /* Therefore autodelete makes no sence */, virtualHost, arguments);
if (queue.isDurable())
{
@@ -221,7 +234,7 @@
AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
- queue.bind(routingKey, null, exchange);
+ queue.bind(exchange, routingKey, null);
_logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
@@ -229,7 +242,7 @@
if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange())
{
- queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange());
+ queue.bind(virtualHost.getExchangeRegistry().getDefaultExchange(), queue.getName(), null);
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Thu Aug 14 20:40:49 2008
@@ -191,9 +191,7 @@
{
_exchangeMbean.unregister();
}
- }
-
- abstract public Map<AMQShortString, List<AMQQueue>> getBindings();
+ }
public String toString()
{
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Thu Aug 14 20:40:49 2008
@@ -43,8 +43,8 @@
public DefaultExchangeFactory(VirtualHost host)
{
_host = host;
- registerExchangeType(DestNameExchange.TYPE);
- registerExchangeType(DestWildExchange.TYPE);
+ registerExchangeType(DirectExchange.TYPE);
+ registerExchangeType(TopicExchange.TYPE);
registerExchangeType(HeadersExchange.TYPE);
registerExchangeType(FanoutExchange.TYPE);
}
@@ -67,7 +67,7 @@
if (exchType == null)
{
- throw new AMQUnknownExchangeType("Unknown exchange type: " + type, null);
+ throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null);
}
Exchange e = exchType.newInstance(_host, exchange, durable, ticket, autoDelete);
return e;