You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC

svn commit: r686136 [2/17] - in /incubator/qpid/branches/qpid.0-10/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/bin/ broker/etc/ broker/...

Modified: incubator/qpid/branches/qpid.0-10/java/010ExcludeList
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/010ExcludeList?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/010ExcludeList (original)
+++ incubator/qpid/branches/qpid.0-10/java/010ExcludeList Thu Aug 14 20:40:49 2008
@@ -4,6 +4,7 @@
 org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveC2Only
 org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth
 org.apache.qpid.test.unit.xa.TopicTest#testMigrateDurableSubscriber
+org.apache.qpid.test.unit.ack.AcknowledgeTest#*
 // those tests need durable subscribe states to be persisted
 org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent
 // those tests require broker recovery
@@ -32,6 +33,7 @@
 org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverAsQueueBrowserCreated
 org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrowser
 org.apache.qpid.test.testcases.FailoverTest#*
+org.apache.qpid.test.client.failover.FailoverTest#*
 // Those tests are testing 0.8 specific semantics
 org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P
 org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P
@@ -45,5 +47,19 @@
 org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxP2P
 org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteNoTxPubSub
 org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxPubSub
+org.apache.qpid.test.client.FlowControlTest#*
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testDefaultExchanges
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnresolvedVirtualHostFailure
 // the 0.10 c++ broker does not implement forget
-org.apache.qpid.test.unit.xa.FaultTest#testForget
\ No newline at end of file
+org.apache.qpid.test.unit.xa.FaultTest#testForget
+// the 0-10 c++ broker does not implement priority / this test depends on a Java broker extension for queue creation
+org.apache.qpid.server.queue.PriorityTest
+//this test checks explicitly for 0-8 flow control semantics
+org.apache.qpid.test.client.FlowControlTest
+// 0-10 c++ broker doesn't implement virtual hosts, or those wackhy exchanges
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnresolvedVirtualHostFailure
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testDefaultExchanges
+// 0-10 c++ broker in cpp.testprofile is started with no auth so won't pass this test
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection
+// c++ broker doesn't do selectors, so this will fail
+org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesDoNotFillQueue

Modified: incubator/qpid/branches/qpid.0-10/java/010ExcludeList-store
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/010ExcludeList-store?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/010ExcludeList-store (original)
+++ incubator/qpid/branches/qpid.0-10/java/010ExcludeList-store Thu Aug 14 20:40:49 2008
@@ -5,6 +5,7 @@
 org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth
 org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash
 org.apache.qpid.test.unit.xa.TopicTest#testMigrateDurableSubscriber
+org.apache.qpid.test.unit.ack.AcknowledgeTest#*
 // those tests need durable subscribe states to be persisted
 org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent
 org.apache.qpid.test.unit.ct.DurableSubscriberTest#testDurSubRestoresMessageSelector
@@ -27,6 +28,7 @@
 org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverAsQueueBrowserCreated
 org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrowser
 org.apache.qpid.test.testcases.FailoverTest#*
+org.apache.qpid.test.client.failover.FailoverTest#*
 // Those tests are testing 0.8 specific semantics
 org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P
 org.apache.qpid.test.testcases.ImmediateMessageTest#test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P
@@ -40,5 +42,16 @@
 org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxP2P
 org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteNoTxPubSub
 org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxPubSub
+org.apache.qpid.test.client.FlowControlTest#*
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testDefaultExchanges
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnresolvedVirtualHostFailure
 // the 0.10 c++ broker does not implement forget
-org.apache.qpid.test.unit.xa.FaultTest#testForget
\ No newline at end of file
+org.apache.qpid.test.unit.xa.FaultTest#testForget
+// the 0-10 c++ broker does not implement priority / this test depends on a Java broker extension for queue creation
+org.apache.qpid.server.queue.PriorityTest
+//this test checks explicitly for 0-8 flow control semantics
+org.apache.qpid.test.client.FlowControlTest
+// The default cpp.testprofile does not start the cpp broker with authentication so this test will fail.
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection
+// c++ broker doesn't do selectors, so this will fail 
+org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesDoNotFillQueue

Modified: incubator/qpid/branches/qpid.0-10/java/08ExcludeList
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/08ExcludeList?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/08ExcludeList (original)
+++ incubator/qpid/branches/qpid.0-10/java/08ExcludeList Thu Aug 14 20:40:49 2008
@@ -2,7 +2,8 @@
 org.apache.qpid.test.unit.xa.TopicTest#*
 org.apache.qpid.test.unit.xa.FaultTest#*
 org.apache.qpid.test.unit.ct.DurableSubscriberTests#*
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurableWithInvalidSelector
 // Those tests are not finished
 org.apache.qpid.test.testcases.TTLTest#*
 org.apache.qpid.test.testcases.FailoverTest#*
+// This is a long running test so should exclude from normal runs
+org.apache.qpid.test.client.failover.FailoverTest#test4MinuteFailover

Modified: incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Thu Aug 14 20:40:49 2008
@@ -22,6 +22,8 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
 
 import javax.management.JMException;
 import javax.management.openmbean.OpenDataException;
@@ -34,7 +36,7 @@
 import org.apache.qpid.server.exchange.AbstractExchange;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 
 import org.apache.qpid.junit.extensions.util.SizeOf;
@@ -191,7 +193,7 @@
         return false;
     }
 
-    public void route(AMQMessage payload) throws AMQException
+    public void route(IncomingMessage payload) throws AMQException
     {
         
         Long value = new Long(SizeOf.getUsedMemory());
@@ -201,17 +203,14 @@
         headers.put(key, value);
         ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
         AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
-        
-        payload.enqueue(q);
+
+        ArrayList<AMQQueue> queues =  new ArrayList<AMQQueue>();
+        queues.add(q);
+        payload.enqueue(queues);
         
     }
 
-	@Override
-	public Map<AMQShortString, List<AMQQueue>> getBindings() {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
+	
 	public boolean isBound(AMQShortString routingKey, FieldTable arguments,
 			AMQQueue queue) {
 		// TODO Auto-generated method stub

Modified: incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java Thu Aug 14 20:40:49 2008
@@ -1,24 +1,3 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
 package org.apache.qpid.extras.exchanges.example;
 
 import java.util.List;
@@ -28,7 +7,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -102,7 +81,7 @@
     {
     }
 
-    public void route(AMQMessage message) throws AMQException
+    public void route(IncomingMessage message) throws AMQException
     {
     }
 

Modified: incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-passwd
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-passwd?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-passwd (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-passwd Thu Aug 14 20:40:49 2008
@@ -18,6 +18,11 @@
 # under the License.
 #
 
+if [ -z "$QPID_HOME" ]; then
+    export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
+    export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
 # Set classpath to include Qpid jar with all required jars in manifest
 QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar
 

Modified: incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server Thu Aug 14 20:40:49 2008
@@ -18,6 +18,11 @@
 # under the License.
 #
 
+if [ -z "$QPID_HOME" ]; then
+    export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
+    export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
 # Set classpath to include Qpid jar with all required jars in manifest
 QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar
 
@@ -26,6 +31,7 @@
        JAVA_VM=-server \
        JAVA_MEM=-Xmx1024m \
        JAVA_GC="-XX:+UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError" \
-       QPID_CLASSPATH=$QPID_LIBS
+       QPID_CLASSPATH=$QPID_LIBS \
+       QPID_RUN_LOG=2
 
 . qpid-run org.apache.qpid.server.Main "$@"

Modified: incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server.bat
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server.bat?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server.bat (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/bin/qpid-server.bat Thu Aug 14 20:40:49 2008
@@ -17,7 +17,7 @@
 @REM under the License.
 @REM
 
-echo off
+@echo off
 REM Script to run the Qpid Java Broker
 
 rem Guess QPID_HOME if not defined
@@ -36,35 +36,168 @@
 goto end
 :okHome
 
+REM set QPID_WORK if not set
+if not "%QPID_WORK%" == "" goto okQpidWork
+if "%HOME%" == "" goto noHome
+set QPID_WOKR=%HOME%
+goto okQpidWork
+
+:noHome
+set QPID_WORK=c:\Temp
+if not exist %QPID_WORK% md %QPID_WORK%
+:okQpidWork
+
 if not "%JAVA_HOME%" == "" goto gotJavaHome
 echo The JAVA_HOME environment variable is not defined
 echo This environment variable is needed to run this program
-goto exit
+goto end
 :gotJavaHome
 if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
 goto okJavaHome
 :noJavaHome
 echo The JAVA_HOME environment variable is not defined correctly
 echo This environment variable is needed to run this program.
-goto exit
+goto end
 :okJavaHome
 
-rem Slurp the command line arguments. This loop allows for an unlimited number
-rem of agruments (up to the command line limit, anyway).
-set QPID_ARGS=%1
-if ""%1""=="""" goto runCommand
-shift
-:loop
-if ""%1""=="""" goto runCommand
-set QPID_ARGS=%QPID_ARGS% %1
+REM set loggin level if not set
+if "%AMQJ_LOGGING_LEVEL%" == "" set AMQJ_LOGGING_LEVEL=info
+
+REM Set the default system properties that we'll use now that they have
+REM all been initialised
+set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% -DQPID_HOME=%QPID_HOME% -DQPID_WORK=%QPID_WORK%
+
+if "%EXTERNAL_CLASSPATH%" == "" set EXTERNAL_CLASSPATH=%CLASSPATH%
+
+REM Use QPID_CLASSPATH if set
+if "%QPID_CLASSPATH%" == "" goto noQpidClasspath
+set CLASSPATH=%QPID_CLASSPATH%
+echo Using CLASSPATH: %CLASSPATH%
+goto afterQpidClasspath
+
+:noQpidClasspath
+echo Warning: Qpid classpath not set. CLASSPATH set to %QPID_HOME%\lib\qpid-incubating.jar
+set CLASSPATH=%QPID_HOME%\lib\qpid-incubating.jar
+:afterQpidClasspath
+
+REM start parsing -run arguments
+set QPID_ARGS=
+if "%1" == "" goto endRunArgs
+:runLoop
+set var=%1
+if "%var:~0,5%" == "-run:" goto runFound
+set QPID_ARGS=%QPID_ARGS% %1 
+:beforeRunShift
 shift
-goto loop
+if not "%1"=="" goto runLoop
+goto endRunArgs
+
+:runFound
+if "%var%" == "-run:debug" goto runDebug
+if "%var%" == "-run:jpda" goto runJdpa
+if "%var:~0,24%" == "-run:external-classpath-" goto runExternalClasspath
+if "%var%" == "-run:print-classpath" goto runPrintCP
+if "%var%" == "-run:help" goto runHelp
+echo "unrecognized -run option '%var%'. For using external classpaths use -run:external-classpath-option"
+goto end
+  
+:runDebug
+REM USAGE: print the classpath and command before running it
+set debug=true
+goto beforeRunShift
+
+:runJdpa
+REM USAGE: adds debugging options to the java command, use
+REM USAGE: JDPA_TRANSPORT and JPDA_ADDRESS to customize the debugging
+REM USAGE: behavior and use JPDA_OPTS to override it entirely
+if "%JPDA_OPTS%" == "" goto beforeRunShift
+if "%JPDA_TRANSPORT%" == "" set JPDA_TRANSPORT=-dt_socket
+if "%JPDA_ADDRESS%" == "" set JPDA_ADDRESS=8000
+set JPDA_OPTS="-Xdebug -Xrunjdwp:transport=%JPDA_TRANSPORT%,address=%JPDA_ADDRESS%,server=y,suspend=n"
+set QPID_OPTS="%QPID_OPTS% %JPDA_OPTS%"
+goto beforeRunShift
+
+:runExternalClasspath
+echo Using external classpath %var%
+REM USAGE: Format is -run:external-classpath-first/last/ignore/only as equals special in DOS
+REM USAGE: controls how the CLASSPATH environment variable is used by
+REM USAGE: this script, value can be one of ignore (the default), first,
+REM USAGE: last, and only
+if "%var%" == "-run:external-classpath-ignore" goto beforeRunShift
+if "%var%" == "-run:external-classpath-first" goto extCPFirst
+if "%var%" == "-run:external-classpath-last" goto extCPLast
+if "%var%" == "-run:external-classpath-only" goto extCPOnly
+echo Invalid value provided for external classpath.
+goto end
+
+:extCPFirst
+set CLASSPATH=%EXTERNAL_CLASSPATH%;%CLASSPATH%
+goto beforeRunShift
+
+:extCPLast
+set CLASSPATH=%CLASSPATH%;%EXTERNAL_CLASSPATH%
+goto beforeRunShift
+
+:extCPonly
+set CLASSPATH=%EXTERNAL_CLASSPATH%
+goto beforeRunShift
+
+:runPrintCP
+REM USAGE: print the classpath
+echo %CLASSPATH%
+goto beforeRunShift
+
+:runHelp
+REM USAGE: print this message
+echo -------------------------------------------------------------------------------------------
+echo -run:option where option can be the following.
+echo debug : Prints classpath and command before running it
+echo jpda : Adds remote debugging info using JPDA_OPTS. Use JPDA_TRANSPORT and JPDA_ADDRESS to
+echo        customize, JPDA_OPTS to override
+echo external-classpath : Valid values are: ignore, first, last and only.
+echo print-classpath : Prints classpath before running command
+echo help : Prints this message
+echo --------------------------------------------------------------------------------------------
+goto end
+
+REM end parsing -run arguments
+:endRunArgs
+
+set JAVA_VM=-server
+set JAVA_MEM=-Xmx1024m
+set JAVA_GC=-XX:+UseConcMarkSweepGC 
+rem removing the following vm arg from JAVA_GC  as it is supported on ly in Java 1.6
+rem -XX:+HeapDumpOnOutOfMemoryError"
+
+REM Use QPID_JAVA_GC if set
+if "%QPID_JAVA_GC%" == "" goto noQpidJavaGC
+set JAVA_GC=%QPID_JAVA_GC%
+echo Using QPID_JAVA_GC setting: %QPID_JAVA_GC%
+goto afteQpidJavaGC
+
+:noQPidJavaGC
+echo Info: QPID_JAVA_GC not set. Defaulting to JAVA_GC %JAVA_GC%
+:afterQpidJavaGC
+
+REM Use QPID_JAVA_MEM if set
+if "%QPID_JAVA_MEM%" == "" goto noQpidJavaMem
+set JAVA_MEM=%QPID_JAVA_MEM%
+echo Using QPID_JAVA_MEM setting: %QPID_JAVA_MEM%
+goto afterQpidJavaMem
+
+:noQpidJavaMem
+echo Info: QPID_JAVA_MEM not set. Defaulting to JAVA_MEM %JAVA_MEM%
+:after QpidJavaMem
+
 
 rem QPID_OPTS intended to hold any -D props for use
 rem user must enclose any value for QPID_OPTS in double quotes
 :runCommand
-set LAUNCH_JAR=%QPID_HOME%\lib\qpid-incubating.jar
 set MODULE_JARS=%QPID_MODULE_JARS%
-"%JAVA_HOME%\bin\java" -server -Xmx1024m %QPID_OPTS% -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
+set COMMAND="%JAVA_HOME%\bin\java" %JAVA_VM% %JAVA_MEM% %JAVA_GC% %QPID_OPTS% %SYSTEM_PROPS% -cp "%CLASSPATH%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
+
+if "%debug%" == "true" echo %CLASSPATH%;%LAUNCH_JAR%;%MODULE_JARS%
+if "%debug%" == "true" echo %COMMAND%
+%COMMAND%
 
 :end

Modified: incubator/qpid/branches/qpid.0-10/java/broker/etc/acl.config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/etc/acl.config.xml?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/etc/acl.config.xml (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/etc/acl.config.xml Thu Aug 14 20:40:49 2008
@@ -93,7 +93,7 @@
                 <queues>
                     <exchange>amq.direct</exchange>
                     <!-- 4Mb -->
-                    <maximumQueueDepth>4235g264</maximumQueueDepth>
+                    <maximumQueueDepth>4235264</maximumQueueDepth>
                     <!-- 2Mb -->
                     <maximumMessageSize>2117632</maximumMessageSize>
                     <!-- 10 mins -->

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/grammar/SelectorParser.jj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/grammar/SelectorParser.jj?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/grammar/SelectorParser.jj (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/grammar/SelectorParser.jj Thu Aug 14 20:40:49 2008
@@ -94,7 +94,7 @@
             return this.JmsSelector();
         }
         catch (Throwable e) {
-	        throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql, e);
+	        throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql,e);
         }
 
     }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Thu Aug 14 20:40:49 2008
@@ -56,8 +56,9 @@
 import org.apache.qpid.server.management.MBeanDescription;
 import org.apache.qpid.server.management.ManagedBroker;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -175,7 +176,8 @@
                 ownerShortString = new AMQShortString(owner);
             }
 
-            queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost());
+            queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost(),
+                                                       null);
             if (queue.isDurable() && !queue.isAutoDelete())
             {
                 _messageStore.createQueue(queue);
@@ -220,7 +222,7 @@
         try
         {
             queue.delete();
-            _messageStore.removeQueue(new AMQShortString(queueName));
+            _messageStore.removeQueue(queue);
 
         }
         catch (AMQException ex)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Aug 14 20:40:49 2008
@@ -21,7 +21,6 @@
 package org.apache.qpid.server;
 
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.configuration.Configured;
 import org.apache.qpid.framing.AMQShortString;
@@ -30,14 +29,23 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
 import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.NoRouteException;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.NoRouteException;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.LocalTransactionalContext;
@@ -45,13 +53,13 @@
 import org.apache.qpid.server.txn.TransactionalContext;
 
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class AMQChannel
 {
@@ -61,13 +69,8 @@
 
     private final int _channelId;
 
-    // private boolean _transactional;
-
-    private long _prefetch_HighWaterMark;
 
-    private long _prefetch_LowWaterMark;
-
-    private long _prefetchSize;
+    private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l);
 
     /**
      * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -86,10 +89,11 @@
      * been received by this channel. As the frames are received the message gets updated and once all frames have been
      * received the message can then be routed.
      */
-    private AMQMessage _currentMessage;
+    private IncomingMessage _currentMessage;
+
+    /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
+    private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
 
-    /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
-    private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
 
     private final MessageStore _messageStore;
 
@@ -97,7 +101,7 @@
 
     private final AtomicBoolean _suspended = new AtomicBoolean(false);
 
-    private TransactionalContext _txnContext, _nonTransactedContext;
+    private TransactionalContext _txnContext;
 
     /**
      * A context used by the message store enabling it to track context for a given channel even across thread
@@ -109,8 +113,6 @@
 
     private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
 
-    private Set<Long> _browsedAcks = new HashSet<Long>();
-
     // Why do we need this reference ? - ritchiem
     private final AMQProtocolSession _session;
     private boolean _closing;
@@ -118,7 +120,7 @@
     @Configured(path = "advanced.enableJMSXUserID",
                 defaultValue = "false")
     public boolean ENABLE_JMSXUserID;
-
+    
 
     public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
             throws AMQException
@@ -129,8 +131,8 @@
         _session = session;
         _channelId = channelId;
         _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
-        _prefetch_HighWaterMark = DEFAULT_PREFETCH;
-        _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
+
+
         _messageStore = messageStore;
 
         // by default the session is non-transactional
@@ -140,7 +142,7 @@
     /** Sets this channel to be part of a local transaction */
     public void setLocalTransactional()
     {
-        _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
+        _txnContext = new LocalTransactionalContext(this);
     }
 
     public boolean isTransactional()
@@ -156,55 +158,15 @@
         return _channelId;
     }
 
-    public long getPrefetchCount()
-    {
-        return _prefetch_HighWaterMark;
-    }
-
-    public void setPrefetchCount(long prefetchCount)
-    {
-        _prefetch_HighWaterMark = prefetchCount;
-    }
-
-    public long getPrefetchSize()
-    {
-        return _prefetchSize;
-    }
-
-    public void setPrefetchSize(long prefetchSize)
-    {
-        _prefetchSize = prefetchSize;
-    }
-
-    public long getPrefetchLowMarkCount()
-    {
-        return _prefetch_LowWaterMark;
-    }
-
-    public void setPrefetchLowMarkCount(long prefetchCount)
-    {
-        _prefetch_LowWaterMark = prefetchCount;
-    }
-
-    public long getPrefetchHighMarkCount()
-    {
-        return _prefetch_HighWaterMark;
-    }
-
-    public void setPrefetchHighMarkCount(long prefetchCount)
-    {
-        _prefetch_HighWaterMark = prefetchCount;
-    }
-
-    public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher, final Exchange e) throws AMQException
+    public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException
     {
 
-        _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext);
-        _currentMessage.setPublisher(publisher);
+        _currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _txnContext, _session);
+        _currentMessage.setMessageStore(_messageStore);
         _currentMessage.setExchange(e);
     }
 
-    public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
+    public void publishContentHeader(ContentHeaderBody contentHeaderBody)
             throws AMQException
     {
         if (_currentMessage == null)
@@ -215,7 +177,7 @@
         {
             if (_log.isDebugEnabled())
             {
-                _log.debug(debugIdentity() + "Content header received on channel " + _channelId);
+                _log.debug("Content header received on channel " + _channelId);
             }
 
             if (ENABLE_JMSXUserID)
@@ -225,25 +187,48 @@
                 //fixme: fudge for QPID-677
                 properties.getHeaders().keySet();
 
-                properties.setUserId(protocolSession.getAuthorizedID().getName());
+                properties.setUserId(_session.getAuthorizedID().getName());
             }
 
             _currentMessage.setContentHeaderBody(contentHeaderBody);
+
             _currentMessage.setExpiration();
 
             routeCurrentMessage();
-            _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
 
-            // check and deliver if header says body length is zero
-            if (contentHeaderBody.bodySize == 0)
+            _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
+
+            deliverCurrentMessageIfComplete();
+
+        }
+    }
+
+    private void deliverCurrentMessageIfComplete()
+            throws AMQException
+    {
+        // check and deliver if header says body length is zero
+        if (_currentMessage.allContentReceived())
+        {
+            try
             {
-                _txnContext.messageProcessed(protocolSession);
+                _currentMessage.deliverToQueues();
+            }
+            catch (NoRouteException e)
+            {
+                _returnMessages.add(e);
+            }
+            finally
+            {
+                // callback to allow the context to do any post message processing
+                // primary use is to allow message return processing in the non-tx case
+                _txnContext.messageProcessed(_session);
                 _currentMessage = null;
             }
         }
+
     }
 
-    public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) throws AMQException
+    public void publishContentBody(ContentBody contentBody) throws AMQException
     {
         if (_currentMessage == null)
         {
@@ -260,15 +245,11 @@
 
             // returns true iff the message was delivered (i.e. if all data was
             // received
-            if (_currentMessage.addContentBodyFrame(_storeContext,
-                        protocolSession.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
-                            contentBody)))
-            {
-                // callback to allow the context to do any post message processing
-                // primary use is to allow message return processing in the non-tx case
-                _txnContext.messageProcessed(protocolSession);
-                _currentMessage = null;
-            }
+            _currentMessage.addContentBodyFrame(
+                    _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
+                            contentBody));
+
+            deliverCurrentMessageIfComplete();
         }
         catch (AMQException e)
         {
@@ -287,6 +268,7 @@
         }
         catch (NoRouteException e)
         {
+            //_currentMessage.incrementReference();
             _returnMessages.add(e);
         }
     }
@@ -307,18 +289,17 @@
      *
      * @param tag       the tag chosen by the client (if null, server will generate one)
      * @param queue     the queue to subscribe to
-     * @param session   the protocol session of the subscriber
-     * @param noLocal   Flag stopping own messages being receivied.
-     * @param exclusive Flag requesting exclusive access to the queue
      * @param acks      Are acks enabled for this subscriber
      * @param filters   Filters to apply to this subscriber
      *
+     * @param noLocal   Flag stopping own messages being receivied.
+     * @param exclusive Flag requesting exclusive access to the queue
      * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
      *
      * @throws ConsumerTagNotUniqueException if the tag is not unique
      * @throws AMQException                  if something goes wrong
      */
-    public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
+    public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
                                            FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
     {
         if (tag == null)
@@ -326,77 +307,65 @@
             tag = new AMQShortString("sgen_" + getNextConsumerTag());
         }
 
-        if (_consumerTag2QueueMap.containsKey(tag))
+        if (_tag2SubscriptionMap.containsKey(tag))
         {
             throw new ConsumerTagNotUniqueException();
         }
 
+         Subscription subscription =
+                SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
+
+
+        // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
         // We add before we register as the Async Delivery process may AutoClose the subscriber
         // so calling _cT2QM.remove before we have done put which was after the register succeeded.
         // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
-        _consumerTag2QueueMap.put(tag, queue);
+
+        _tag2SubscriptionMap.put(tag, subscription);
 
         try
         {
-            queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
+            queue.registerSubscription(subscription, exclusive);
         }
         catch (AMQException e)
         {
-            _consumerTag2QueueMap.remove(tag);
+            _tag2SubscriptionMap.remove(tag);
             throw e;
         }
-
         return tag;
     }
 
     /**
      * Unsubscribe a consumer from a queue.
-     * @param session
      * @param consumerTag
      * @return true if the consumerTag had a mapped queue that could be unregistered.
      * @throws AMQException
      */
-    public boolean unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
+    public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
     {
-        if (_log.isDebugEnabled())
-        {
-            _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
-            _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
-            {
-
-                public boolean callback(UnacknowledgedMessage message) throws AMQException
-                {
-                    _log.debug(message);
-
-                    return true;
-                }
 
-                public void visitComplete()
-                {
-                }
-            });
-        }
-
-        AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
-        if (q != null)
+        Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
+        if (sub != null)
         {
-            q.unregisterProtocolSession(session, _channelId, consumerTag);
+            sub.getQueue().unregisterSubscription(sub);
             return true;
         }
+        else
+        {
+            _log.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered.");
+        }
         return false;
     }
 
     /**
      * Called from the protocol session to close this channel and clean up. T
      *
-     * @param session The session to close
-     *
      * @throws AMQException if there is an error during closure
      */
-    public void close(AMQProtocolSession session) throws AMQException
+    public void close() throws AMQException
     {
         _txnContext.rollback();
-        unsubscribeAllConsumers(session);
+        unsubscribeAllConsumers();
         try
         {
             requeue();
@@ -414,11 +383,11 @@
         _closing = closing;
     }
 
-    private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
+    private void unsubscribeAllConsumers() throws AMQException
     {
         if (_log.isInfoEnabled())
         {
-            if (!_consumerTag2QueueMap.isEmpty())
+            if (!_tag2SubscriptionMap.isEmpty())
             {
                 _log.info("Unsubscribing all consumers on channel " + toString());
             }
@@ -428,17 +397,19 @@
             }
         }
 
-        for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet())
+        for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
         {
             if (_log.isInfoEnabled())
             {
                 _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
             }
 
-            me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
+            Subscription sub = me.getValue();
+
+            sub.getQueue().unregisterSubscription(sub);
         }
 
-        _consumerTag2QueueMap.clear();
+        _tag2SubscriptionMap.clear();
     }
 
     /**
@@ -447,9 +418,9 @@
      * @param entry       the record of the message on the queue that was delivered
      * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
      *                    delivery tag)
-     * @param consumerTag The tag for the consumer that is to acknowledge this message.
+     * @param subscription The consumer that is to acknowledge this message.
      */
-    public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, AMQShortString consumerTag)
+    public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
     {
         if (_log.isDebugEnabled())
         {
@@ -462,16 +433,13 @@
                 if (_log.isDebugEnabled())
                 {
                     _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
-                               + ") with a queue(" + entry.getQueue() + ") for " + consumerTag);
+                               + ") with a queue(" + entry.getQueue() + ") for " + subscription);
                 }
             }
         }
 
-        synchronized (_unacknowledgedMessageMap.getLock())
-        {
-            _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag,_unacknowledgedMessageMap));
-            checkSuspension();
-        }
+        _unacknowledgedMessageMap.add(deliveryTag, entry);
+
     }
 
     private final String id = "(" + System.identityHashCode(this) + ")";
@@ -490,7 +458,7 @@
     public void requeue() throws AMQException
     {
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
-        Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+        Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
 
         // Deliver these messages out of the transaction as their delivery was never
         // part of the transaction only the receive.
@@ -505,13 +473,9 @@
 
             if (!(_txnContext instanceof NonTransactionalContext))
             {
-                // if (_nonTransactedContext == null)
-                {
-                    _nonTransactedContext =
-                            new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
-                }
 
-                deliveryContext = _nonTransactedContext;
+                    deliveryContext =
+                            new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
             }
             else
             {
@@ -519,22 +483,23 @@
             }
         }
 
-        for (UnacknowledgedMessage unacked : messagesToBeDelivered)
+        for (QueueEntry unacked : messagesToBeDelivered)
         {
             if (!unacked.isQueueDeleted())
             {
-                // Ensure message is released for redelivery
-                unacked.entry.release();
-
                 // Mark message redelivered
                 unacked.getMessage().setRedelivered(true);
 
+                // Ensure message is released for redelivery
+                unacked.release();
+
                 // Deliver Message
-                deliveryContext.deliver(unacked.entry, false);
+                deliveryContext.requeue(unacked);
 
-                // Should we allow access To the DM to directy deliver the message?
-                // As we don't need to check for Consumers or worry about incrementing the message count?
-                // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
+            }
+            else
+            {
+                unacked.discard(_storeContext);
             }
         }
 
@@ -549,32 +514,29 @@
      */
     public void requeue(long deliveryTag) throws AMQException
     {
-        UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
+        QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag);
 
         if (unacked != null)
         {
+            // Mark message redelivered
+            unacked.getMessage().setRedelivered(true);
 
             // Ensure message is released for redelivery
             if (!unacked.isQueueDeleted())
             {
-                unacked.entry.release();
+                unacked.release();
             }
 
-            // Mark message redelivered
-            unacked.getMessage().setRedelivered(true);
 
             // Deliver these messages out of the transaction as their delivery was never
             // part of the transaction only the receive.
             TransactionalContext deliveryContext;
             if (!(_txnContext instanceof NonTransactionalContext))
             {
-                // if (_nonTransactedContext == null)
-                {
-                    _nonTransactedContext =
+
+                deliveryContext =
                             new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
-                }
 
-                deliveryContext = _nonTransactedContext;
             }
             else
             {
@@ -584,7 +546,7 @@
             if (!unacked.isQueueDeleted())
             {
                 // Redeliver the messages to the front of the queue
-                deliveryContext.deliver(unacked.entry, true);
+                deliveryContext.requeue(unacked);
                 // Deliver increments the message count but we have already deliverted this once so don't increment it again
                 // this was because deliver did an increment changed this.
             }
@@ -592,11 +554,8 @@
             {
                 _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
                           + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
-                // _log.error("Requested requeue of message:" + deliveryTag +
-                // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
-                //
-                // deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
-                //
+
+                unacked.discard(_storeContext);
             }
         }
         else
@@ -604,25 +563,6 @@
             _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
                       + _unacknowledgedMessageMap.size());
 
-            if (_log.isDebugEnabled())
-            {
-                _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
-                {
-                    int count = 0;
-
-                    public boolean callback(UnacknowledgedMessage message) throws AMQException
-                    {
-                        _log.debug(
-                                (count++) + ": (" + message.getMessage().debugIdentity() + ")" + "[" + message.deliveryTag + "]");
-
-                        return false; // Continue
-                    }
-
-                    public void visitComplete()
-                    {
-                    }
-                });
-            }
         }
 
     }
@@ -636,8 +576,10 @@
      */
     public void resend(final boolean requeue) throws AMQException
     {
-        final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
-        final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
+
+
+        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
+        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
 
         if (_log.isDebugEnabled())
         {
@@ -647,23 +589,25 @@
         // Process the Unacked-Map.
         // Marking messages who still have a consumer for to be resent
         // and those that don't to be requeued.
+
         _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
         {
-            public boolean callback(UnacknowledgedMessage message) throws AMQException
+            public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
             {
-                AMQShortString consumerTag = message.consumerTag;
+
                 AMQMessage msg = message.getMessage();
                 msg.setRedelivered(true);
-                if (consumerTag != null)
+                final Subscription subscription = message.getDeliveredSubscription();
+                if (subscription != null)
                 {
                     // Consumer exists
-                    if (_consumerTag2QueueMap.containsKey(consumerTag))
+                    if (!subscription.isClosed())
                     {
-                        msgToResend.add(message);
+                        msgToResend.put(deliveryTag, message);
                     }
                     else // consumer has gone
                     {
-                        msgToRequeue.add(message);
+                        msgToRequeue.put(deliveryTag, message);
                     }
                 }
                 else
@@ -675,7 +619,7 @@
                     {
                         if (requeue)
                         {
-                            msgToRequeue.add(message);
+                            msgToRequeue.put(deliveryTag, message);
                         }
                         else
                         {
@@ -684,7 +628,8 @@
                     }
                     else
                     {
-                        _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
+                        message.discard(_storeContext);
+                        _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
                     }
                 }
 
@@ -697,6 +642,8 @@
             }
         });
 
+        _unacknowledgedMessageMap.clear();
+
         // Process Messages to Resend
         if (_log.isDebugEnabled())
         {
@@ -710,9 +657,15 @@
             }
         }
 
-        for (UnacknowledgedMessage message : msgToResend)
+        for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
         {
+            QueueEntry message = entry.getValue();
+            long deliveryTag = entry.getKey();
+
+
+
             AMQMessage msg = message.getMessage();
+            AMQQueue queue = message.getQueue();
 
             // Our Java Client will always suspend the channel when resending!
             // If the client has requested the messages be resent then it is
@@ -727,46 +680,20 @@
             // else
             // {
             // release to allow it to be delivered
-            message.entry.release();
 
             // Without any details from the client about what has been processed we have to mark
             // all messages in the unacked map as redelivered.
             msg.setRedelivered(true);
 
-            Subscription sub = message.entry.getDeliveredSubscription();
+            Subscription sub = message.getDeliveredSubscription();
 
             if (sub != null)
             {
-                // Get the lock so we can tell if the sub scription has closed.
-                // will stop delivery to this subscription until the lock is released.
-                // note: this approach would allow the use of a single queue if the
-                // PreDeliveryQueue would allow head additions.
-                // In the Java Qpid client we are suspended whilst doing this so it is all rather Mute..
-                // needs guidance from AMQP WG Model SIG
-                synchronized (sub.getSendLock())
+                
+                if(!queue.resend(message, sub))
                 {
-                    if (sub.isClosed())
-                    {
-                        if (_log.isDebugEnabled())
-                        {
-                            _log.debug("Subscription(" + System.identityHashCode(sub)
-                                       + ") closed during resend so requeuing message");
-                        }
-                        // move this message to requeue
-                        msgToRequeue.add(message);
-                    }
-                    else
-                    {
-                        if (_log.isDebugEnabled())
-                        {
-                            _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:"
-                                       + System.identityHashCode(sub));
-                        }
-
-                        sub.addToResendQueue(message.entry);
-                        _unacknowledgedMessageMap.remove(message.deliveryTag);
-                    }
-                } // sync(sub.getSendLock)
+                    msgToRequeue.put(deliveryTag, message);
+                }
             }
             else
             {
@@ -777,7 +704,7 @@
                               + ")to prevent loss");
                 }
                 // move this message to requeue
-                msgToRequeue.add(message);
+                msgToRequeue.put(deliveryTag, message);
             }
         } // for all messages
         // } else !isSuspend
@@ -795,13 +722,9 @@
         TransactionalContext deliveryContext;
         if (!(_txnContext instanceof NonTransactionalContext))
         {
-            if (_nonTransactedContext == null)
-            {
-                _nonTransactedContext =
-                        new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
-            }
 
-            deliveryContext = _nonTransactedContext;
+            deliveryContext =
+                        new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
         }
         else
         {
@@ -809,14 +732,17 @@
         }
 
         // Process Messages to Requeue at the front of the queue
-        for (UnacknowledgedMessage message : msgToRequeue)
+        for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
         {
-            message.entry.release();
-            message.entry.setRedelivered(true);
+            QueueEntry message = entry.getValue();
+            long deliveryTag = entry.getKey();
+            
+            message.release();
+            message.setRedelivered(true);
 
-            deliveryContext.deliver(message.entry, true);
+            deliveryContext.requeue(message);
 
-            _unacknowledgedMessageMap.remove(message.deliveryTag);
+            _unacknowledgedMessageMap.remove(deliveryTag);
         }
     }
 
@@ -827,38 +753,47 @@
      *
      * @param queue the queue that has been deleted
      *
-     * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
      */
-    public void queueDeleted(final AMQQueue queue) throws AMQException
+ /*   public void queueDeleted(final AMQQueue queue)
     {
-        _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+        try
         {
-            public boolean callback(UnacknowledgedMessage message) throws AMQException
+            _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
             {
-                if (message.getQueue() == queue)
+                public boolean callback(UnacknowledgedMessage message)
                 {
-                    try
+                    if (message.getQueue() == queue)
                     {
-                        message.discard(_storeContext);
-                        message.setQueueDeleted(true);
+                        try
+                        {
+                            message.discard(_storeContext);
+                            message.setQueueDeleted(true);
 
+                        }
+                        catch (AMQException e)
+                        {
+                            _log.error(
+                                    "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
+                            throw new RuntimeException(e);
+                        }
                     }
-                    catch (AMQException e)
-                    {
-                        _log.error(
-                                "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
-                    }
+
+                    return false;
                 }
 
-                return false;
-            }
+                public void visitComplete()
+                {
+                }
+            });
+        }
+        catch (AMQException e)
+        {
+            _log.error("Unexpected Error while handling deletion of queue", e);
+            throw new RuntimeException(e);
+        }
 
-            public void visitComplete()
-            {
-            }
-        });
     }
-
+*/
     /**
      * Acknowledge one or more messages.
      *
@@ -870,23 +805,7 @@
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
     {
-        synchronized (_unacknowledgedMessageMap.getLock())
-        {
-            if (_log.isDebugEnabled())
-            {
-                _log.debug("Unacked (PreAck) Size:" + _unacknowledgedMessageMap.size());
-            }
-
-            _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
-
-            if (_log.isDebugEnabled())
-            {
-                _log.debug("Unacked (PostAck) Size:" + _unacknowledgedMessageMap.size());
-            }
-
-        }
-
-        checkSuspension();
+        _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
     }
 
     /**
@@ -899,43 +818,22 @@
         return _unacknowledgedMessageMap;
     }
 
-    private void checkSuspension()
-    {
-        boolean suspend;
-
-        suspend =
-                ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark))
-                || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()));
-
-        setSuspended(suspend);
-    }
 
     public void setSuspended(boolean suspended)
     {
-        boolean isSuspended = _suspended.get();
 
-        if (isSuspended && !suspended)
-        {
-            // Continue being suspended if we are above the _prefetch_LowWaterMark
-            suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
-        }
 
         boolean wasSuspended = _suspended.getAndSet(suspended);
         if (wasSuspended != suspended)
         {
             if (wasSuspended)
             {
-                _log.debug("Unsuspending channel " + this);
                 // may need to deliver queued messages
-                for (AMQQueue q : _consumerTag2QueueMap.values())
+                for (Subscription s : _tag2SubscriptionMap.values())
                 {
-                    q.deliverAsync();
+                    s.getQueue().deliverAsync(s);
                 }
             }
-            else
-            {
-                _log.debug("Suspending channel " + this);
-            }
         }
     }
 
@@ -961,12 +859,7 @@
 
     public String toString()
     {
-        StringBuilder sb = new StringBuilder(30);
-        sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional());
-        sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark);
-        sb.append("/").append(_prefetch_HighWaterMark);
-
-        return sb.toString();
+        return "["+_session.toString()+":"+_channelId+"]";
     }
 
     public void setDefaultQueue(AMQQueue queue)
@@ -984,14 +877,14 @@
         return _storeContext;
     }
 
-    public void processReturns(AMQProtocolSession session) throws AMQException
+    public void processReturns() throws AMQException
     {
         if (!_returnMessages.isEmpty())
         {
             for (RequiredDeliveryException bouncedMessage : _returnMessages)
             {
                 AMQMessage message = bouncedMessage.getAMQMessage();
-                session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+                _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
                                                                  new AMQShortString(bouncedMessage.getMessage()));
 
                 message.decrementReference(_storeContext);
@@ -1001,40 +894,68 @@
         }
     }
 
-    public boolean wouldSuspend(AMQMessage msg)
+
+    public TransactionalContext getTransactionalContext()
     {
-        if (isSuspended())
-        {
-            return true;
-        }
-        else
-        {
-            boolean willSuspend =
-                    ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark));
-            if (!willSuspend)
-            {
-                final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
+        return _txnContext;
+    }
 
-                willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < (msg.getSize() + unackedSize));
-            }
+    public boolean isClosing()
+    {
+        return _closing;
+    }
 
-            if (willSuspend)
-            {
-                setSuspended(true);
-            }
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _session;
+    }
 
-            return willSuspend;
-        }
+    public FlowCreditManager getCreditManager()
+    {
+        return _creditManager;
+    }
 
+    public void setCredit(final long prefetchSize, final int prefetchCount)
+    {
+        _creditManager.setCreditLimits(prefetchSize, prefetchCount);
     }
 
-    public TransactionalContext getTransactionalContext()
+    public List<RequiredDeliveryException> getReturnMessages()
     {
-        return _txnContext;
+        return _returnMessages;
     }
 
-    public boolean isClosing()
+    public MessageStore getMessageStore()
     {
-        return _closing;
+        return _messageStore;
+    }
+
+    private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod()
+        {
+
+            public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+                    throws AMQException
+            {
+               getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag());
+            }
+        };
+
+    public ClientDeliveryMethod getClientDeliveryMethod()
+    {
+        return _clientDeliveryMethod;
+    }
+
+    private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
+        {
+
+            public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            {
+                addUnacknowledgedMessage(entry, deliveryTag, sub);
+            }
+        };
+
+    public RecordDeliveryMethod getRecordDeliveryMethod()
+    {
+        return _recordDeliveryMethod;
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/Main.java Thu Aug 14 20:40:49 2008
@@ -34,7 +34,6 @@
 import org.apache.log4j.xml.DOMConfigurator;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.SimpleByteBufferAllocator;
 import org.apache.mina.common.FixedSizeByteBufferAllocator;
 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
@@ -279,6 +278,12 @@
             ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
         }
 
+
+        if(connectorConfig.useBiasedWrites)
+        {
+            System.setProperty("org.apache.qpid.use_write_biased_pool","true");
+        }
+
         int port = connectorConfig.port;
 
         String portStr = commandLine.getOptionValue("p");
@@ -415,7 +420,8 @@
                     bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
                 }
 
-                acceptor.bind(bindAddress, handler, sconfig);
+                bind(acceptor, bindAddress, handler, sconfig);
+
                 //fixme  qpid.AMQP should be using qpidproperties to get value
                 _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
             }
@@ -426,7 +432,8 @@
                 try
                 {
 
-                    acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
+                    bind(acceptor, new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
+
                     //fixme  qpid.AMQP should be using qpidproperties to get value
                     _brokerLogger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
 
@@ -449,6 +456,23 @@
         }
     }
 
+    /**
+     * Ensure that any bound Acceptors are recorded in the registry so they can be closed later.
+     *
+     * @param acceptor
+     * @param bindAddress
+     * @param handler
+     * @param sconfig
+     *
+     * @throws IOException from the acceptor.bind command
+     */
+    private void bind(IoAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException
+    {
+        acceptor.bind(bindAddress, handler, sconfig);
+
+        ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor);
+    }
+
     public static void main(String[] args)
     {
 

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Thu Aug 14 20:40:49 2008
@@ -39,19 +39,30 @@
  */
 public abstract class RequiredDeliveryException extends AMQException
 {
-    private final AMQMessage _amqMessage;
+    private AMQMessage _amqMessage;
 
     public RequiredDeliveryException(String message, AMQMessage payload)
     {
         super(message);
 
+        setMessage(payload);
+    }
+
+
+    public RequiredDeliveryException(String message)
+    {
+        super(message);
+    }
+
+    public void setMessage(final AMQMessage payload)
+    {
+
         // Increment the reference as this message is in the routing phase
         // and so will have the ref decremented as routing fails.
         // we need to keep this message around so we can return it in the
         // handler. So increment here.
         _amqMessage = payload.takeReference();
 
-        // payload.incrementReference();
     }
 
     public AMQMessage getAMQMessage()

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Thu Aug 14 20:40:49 2008
@@ -22,11 +22,14 @@
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.ArrayList;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.queue.QueueEntry;
 
 /**
  * A TxnOp implementation for handling accumulated acks
@@ -34,7 +37,7 @@
 public class TxAck implements TxnOp
 {
     private final UnacknowledgedMessageMap _map;
-    private final List <UnacknowledgedMessage> _unacked = new ArrayList<UnacknowledgedMessage>();
+    private final Map<Long, QueueEntry> _unacked = new HashMap<Long,QueueEntry>();
     private List<Long> _individual;
     private long _deliveryTag;
     private boolean _multiple;
@@ -46,11 +49,12 @@
 
     public void update(long deliveryTag, boolean multiple)
     {
+        _unacked.clear();
         if (!multiple)
         {
             if(_individual == null)
             {
-                 _individual = new ArrayList<Long>();
+                _individual = new ArrayList<Long>();
             }
             //have acked a single message that is not part of
             //the previously acked region so record
@@ -64,36 +68,29 @@
             _deliveryTag = deliveryTag;
             _multiple = true;
         }
-        _unacked.clear();
     }
 
     public void consolidate()
     {
         if(_unacked.isEmpty())
         {
-            consolidate(_unacked);
-        }
-
-    }
-
-    private void consolidate(List<UnacknowledgedMessage> unacked)
-    {
-        //lookup all the unacked messages that have been acked in this transaction
-        if (_multiple)
-        {
-            //get all the unacked messages for the accumulated
-            //multiple acks
-            _map.collect(_deliveryTag, true, unacked);
-        }
-        //get any unacked messages for individual acks outside the
-        //range covered by multiple acks
-        if(_individual != null)
-        {
-            for (Long tag : _individual)
+            //lookup all the unacked messages that have been acked in this transaction
+            if (_multiple)
             {
-                if(_deliveryTag < tag)
+                //get all the unacked messages for the accumulated
+                //multiple acks
+                _map.collect(_deliveryTag, true, _unacked);
+            }
+            if(_individual != null)
+            {
+                //get any unacked messages for individual acks outside the
+                //range covered by multiple acks
+                for (long tag : _individual)
                 {
-                    _map.collect(tag, false, unacked);
+                    if(_deliveryTag < tag)
+                    {
+                        _map.collect(tag, false, _unacked);
+                    }
                 }
             }
         }
@@ -101,12 +98,10 @@
 
     public boolean checkPersistent() throws AMQException
     {
-
-
         consolidate();
         //if any of the messages in unacked are persistent the txn
         //buffer must be marked as persistent:
-        for (UnacknowledgedMessage msg : _unacked)
+        for (QueueEntry msg : _unacked.values())
         {
             if (msg.getMessage().isPersistent())
             {
@@ -119,7 +114,7 @@
     public void prepare(StoreContext storeContext) throws AMQException
     {
         //make persistent changes, i.e. dequeue and decrementReference
-        for (UnacknowledgedMessage msg : _unacked)
+        for (QueueEntry msg : _unacked.values())
         {
             //Message has been ack so discard it. This will dequeue and decrement the reference.
             msg.discard(storeContext);
@@ -133,7 +128,7 @@
         //in memory counter) so if we failed in prepare for full
         //txn, this op will have to compensate by fixing the count
         //in memory (persistent changes will be rolled back by store)
-        for (UnacknowledgedMessage msg : _unacked)
+        for (QueueEntry msg : _unacked.values())
         {
             msg.getMessage().takeReference();
         }
@@ -142,7 +137,7 @@
     public void commit(StoreContext storeContext)
     {
         //remove the unacked messages from the channels map
-        _map.remove(_unacked);
+        _map.remove(_unacked);        
     }
 
     public void rollback(StoreContext storeContext)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Thu Aug 14 20:40:49 2008
@@ -23,41 +23,42 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.StoreContext;
 
 public interface UnacknowledgedMessageMap
 {
     public interface Visitor
     {
         /**
-         * @param message the message being iterated over
-         * @return true to stop iteration, false to continue
+         * @param deliveryTag
+         *@param message the message being iterated over @return true to stop iteration, false to continue
          * @throws AMQException
          */
-        boolean callback(UnacknowledgedMessage message) throws AMQException;
+        boolean callback(final long deliveryTag, QueueEntry message) throws AMQException;
 
         void visitComplete();
     }
 
     void visit(Visitor visitor) throws AMQException;
 
-    Object getLock();
+    void add(long deliveryTag, QueueEntry message);
 
-    void add(long deliveryTag, UnacknowledgedMessage message);
-
-    void collect(Long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
+    void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs);
 
     boolean contains(long deliveryTag) throws AMQException;
 
-    void remove(List<UnacknowledgedMessage> msgs);
-
-    UnacknowledgedMessage remove(long deliveryTag);
+    void remove(Map<Long,QueueEntry> msgs);
 
-    void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException;
+    QueueEntry remove(long deliveryTag);
 
-    Collection<UnacknowledgedMessage> cancelAllMessages();
+    public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException;
+    
+    Collection<QueueEntry> cancelAllMessages();
 
     void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException;
 
@@ -65,7 +66,7 @@
 
     void clear();
 
-    UnacknowledgedMessage get(long deliveryTag);
+    QueueEntry get(long deliveryTag);
 
     /**
      * Get the set of delivery tags that are outstanding.

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Thu Aug 14 20:40:49 2008
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.ack;
 
+import org.apache.qpid.server.store.StoreContext;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -28,6 +29,10 @@
 import java.util.Set;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.txn.TransactionalContext;
 
 public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
@@ -36,7 +41,7 @@
 
     private long _unackedSize;
 
-    private Map<Long, UnacknowledgedMessage> _map;
+    private Map<Long, QueueEntry> _map;
 
     private long _lastDeliveryTag;
 
@@ -45,10 +50,10 @@
     public UnacknowledgedMessageMapImpl(int prefetchLimit)
     {
         _prefetchLimit = prefetchLimit;
-        _map = new LinkedHashMap<Long, UnacknowledgedMessage>(prefetchLimit);
+        _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
     }
 
-    public void collect(Long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
+    public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
     {
         if (multiple)
         {
@@ -56,7 +61,7 @@
         }
         else
         {
-            msgs.add(get(deliveryTag));
+            msgs.put(deliveryTag, get(deliveryTag));
         }
 
     }
@@ -69,26 +74,27 @@
         }
     }
 
-    public void remove(List<UnacknowledgedMessage> msgs)
+    public void remove(Map<Long,QueueEntry> msgs)
     {
         synchronized (_lock)
         {
-            for (UnacknowledgedMessage msg : msgs)
+            for (Long deliveryTag : msgs.keySet())
             {
-                remove(msg.deliveryTag);
+                remove(deliveryTag);
             }
         }
     }
 
-    public UnacknowledgedMessage remove(long deliveryTag)
+    public QueueEntry remove(long deliveryTag)
     {
         synchronized (_lock)
         {
 
-            UnacknowledgedMessage message = _map.remove(deliveryTag);
+            QueueEntry message = _map.remove(deliveryTag);
             if(message != null)
             {
                 _unackedSize -= message.getMessage().getSize();
+
             }
 
             return message;
@@ -99,21 +105,16 @@
     {
         synchronized (_lock)
         {
-            Collection<UnacknowledgedMessage> currentEntries = _map.values();
-            for (UnacknowledgedMessage msg : currentEntries)
+            Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
+            for (Map.Entry<Long, QueueEntry> entry : currentEntries)
             {
-                visitor.callback(msg);
+                visitor.callback(entry.getKey().longValue(), entry.getValue());
             }
             visitor.visitComplete();
         }
     }
 
-    public Object getLock()
-    {
-        return _lock;
-    }
-
-    public void add(long deliveryTag, UnacknowledgedMessage message)
+    public void add(long deliveryTag, QueueEntry message)
     {
         synchronized (_lock)
         {
@@ -123,12 +124,12 @@
         }
     }
 
-    public Collection<UnacknowledgedMessage> cancelAllMessages()
+    public Collection<QueueEntry> cancelAllMessages()
     {
         synchronized (_lock)
         {
-            Collection<UnacknowledgedMessage> currentEntries = _map.values();
-            _map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit);
+            Collection<QueueEntry> currentEntries = _map.values();
+            _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
             _unackedSize = 0l;
             return currentEntries;
         }
@@ -160,14 +161,15 @@
         }
     }
 
-    public void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException
+    public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException
+   
     {
         synchronized (_lock)
         {
-            Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = _map.entrySet().iterator();
+            Iterator<Map.Entry<Long, QueueEntry>> it = _map.entrySet().iterator();
             while (it.hasNext())
             {
-                Map.Entry<Long, UnacknowledgedMessage> unacked = it.next();
+                Map.Entry<Long, QueueEntry> unacked = it.next();
 
                 if (unacked.getKey() > deliveryTag)
                 {
@@ -176,10 +178,14 @@
                                            " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
                 }
 
+                //Message has been ack so discard it. This will dequeue and decrement the reference.
+                unacked.getValue().discard(storeContext);
+
                 it.remove();
+
                 _unackedSize -= unacked.getValue().getMessage().getSize();
 
-                destination.add(unacked.getValue());
+
                 if (unacked.getKey() == deliveryTag)
                 {
                     break;
@@ -188,7 +194,7 @@
         }
     }
     
-    public UnacknowledgedMessage get(long key)
+    public QueueEntry get(long key)
     {
         synchronized (_lock)
         {
@@ -204,14 +210,14 @@
         }
     }
 
-    private void collect(Long key, List<UnacknowledgedMessage> msgs)
+    private void collect(long key, Map<Long, QueueEntry> msgs)
     {
         synchronized (_lock)
         {
-            for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+            for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
             {
-                msgs.add(entry.getValue());
-                if (entry.getKey().equals(key))
+                msgs.put(entry.getKey(),entry.getValue());
+                if (entry.getKey() == key)
                 {
                     break;
                 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Thu Aug 14 20:40:49 2008
@@ -30,11 +30,13 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -176,11 +178,22 @@
                 boolean durable = queueConfiguration.getBoolean("durable" ,false);
                 boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
                 String owner = queueConfiguration.getString("owner", null);
+                FieldTable arguments = null;
+                Integer priorities = queueConfiguration.getInteger("priorities", null);
+                if(priorities != null && priorities.intValue() > 1)
+                {
+                    if(arguments == null)
+                    {
+                        arguments = new FieldTable();
+                    }
+                    arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
+                }
+
 
-                queue = new AMQQueue(queueName,
+                queue = AMQQueueFactory.createAMQQueueImpl(queueName,
                         durable,
                         owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */,
-                        autodelete /* Therefore autodelete makes no sence */, virtualHost);
+                        autodelete /* Therefore autodelete makes no sence */, virtualHost, arguments);
 
                 if (queue.isDurable())
                 {
@@ -221,7 +234,7 @@
                     AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
                     
 
-                    queue.bind(routingKey, null, exchange);
+                    queue.bind(exchange, routingKey, null);
 
 
                     _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
@@ -229,7 +242,7 @@
 
                 if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange())
                 {
-                    queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange());                    
+                    queue.bind(virtualHost.getExchangeRegistry().getDefaultExchange(), queue.getName(), null);
                 }
             }
 

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Thu Aug 14 20:40:49 2008
@@ -191,9 +191,7 @@
         {
             _exchangeMbean.unregister();
         }
-    }
-
-    abstract public Map<AMQShortString, List<AMQQueue>> getBindings();
+    }    
 
     public String toString()
     {

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Thu Aug 14 20:40:49 2008
@@ -43,8 +43,8 @@
     public DefaultExchangeFactory(VirtualHost host)
     {
         _host = host;
-        registerExchangeType(DestNameExchange.TYPE);
-        registerExchangeType(DestWildExchange.TYPE);
+        registerExchangeType(DirectExchange.TYPE);
+        registerExchangeType(TopicExchange.TYPE);
         registerExchangeType(HeadersExchange.TYPE);
         registerExchangeType(FanoutExchange.TYPE);
     }
@@ -67,7 +67,7 @@
         if (exchType == null)
         {
 
-            throw new AMQUnknownExchangeType("Unknown exchange type: " + type, null);
+            throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null);
         }
         Exchange e = exchType.newInstance(_host, exchange, durable, ticket, autoDelete);
         return e;