You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 16:50:31 UTC

svn commit: r1295635 [2/3] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker/ broker/bin/ broker/src/main/java/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Thu Mar  1 15:50:27 2012
@@ -27,6 +27,7 @@ import java.security.Principal;
 import java.text.MessageFormat;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
@@ -63,7 +64,7 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.subscription.Subscription_0_10;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -84,18 +85,20 @@ import org.apache.qpid.transport.Session
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject
+public class ServerSession extends Session 
+        implements AuthorizationHolder, SessionConfig, 
+                   AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
 {
     private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
     
     private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
     private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
+    private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
 
     private final UUID _id;
     private ConnectionConfig _connectionConfig;
     private long _createTime = System.currentTimeMillis();
     private LogActor _actor = GenericActor.getInstance(this);
-    private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
 
     private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
 
@@ -147,7 +150,7 @@ public class ServerSession extends Sessi
     {
         super(connection, delegate, name, expiry);
         _connectionConfig = connConfig;
-        _transaction = new AutoCommitTransaction(this.getMessageStore());
+        _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
         _logSubject = new ChannelLogSubject(this);
         _id = getConfigStore().createId();
         getConfigStore().addConfiguredObject(this);
@@ -184,16 +187,7 @@ public class ServerSession extends Sessi
             invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
         }
         getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
-        PostEnqueueAction postTransactionAction;
-        if(isTransactional())
-        {
-           postTransactionAction = new PostEnqueueAction(queues, message) ;
-        }
-        else
-        {
-            postTransactionAction = _postEnqueueAction;
-            postTransactionAction.setState(queues, message);
-        }
+        PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
         _transaction.enqueue(queues,message, postTransactionAction, 0L);
         incrementOutstandingTxnsIfNecessary();
         updateTransactionalActivity();
@@ -221,12 +215,12 @@ public class ServerSession extends Sessi
     public void accept(RangeSet ranges)
     {
         dispositionChange(ranges, new MessageDispositionAction()
-                                      {
-                                          public void performAction(MessageDispositionChangeListener listener)
-                                          {
-                                              listener.onAccept();
-                                          }
-                                      });
+        {
+            public void performAction(MessageDispositionChangeListener listener)
+            {
+                listener.onAccept();
+            }
+        });
     }
 
 
@@ -444,10 +438,7 @@ public class ServerSession extends Sessi
 
     public boolean isTransactional()
     {
-        // this does not look great but there should only be one "non-transactional"
-        // transactional context, while there could be several transactional ones in
-        // theory
-        return !(_transaction instanceof AutoCommitTransaction);
+        return _transaction.isTransactional();
     }
     
     public boolean inTransaction()
@@ -765,6 +756,7 @@ public class ServerSession extends Sessi
         {
             subscription_0_10.flushCreditState(false);
         }
+        awaitCommandCompletion();
     }
 
     private class PostEnqueueAction implements ServerTransaction.Action
@@ -774,17 +766,12 @@ public class ServerSession extends Sessi
         private ServerMessage _message;
         private final boolean _transactional;
 
-        public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message)
+        public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional)
         {
-            _transactional = true;
+            _transactional = transactional;
             setState(queues, message);
         }
 
-        public PostEnqueueAction()
-        {
-            _transactional = false;
-        }
-
         public void setState(List<? extends BaseQueue> queues, ServerMessage message)
         {
             _message = message;
@@ -830,4 +817,76 @@ public class ServerSession extends Sessi
     {
         return _blocking.get();
     }
+
+    private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
+
+    public void completeAsyncCommands()
+    {
+        AsyncCommand cmd;
+        while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
+        {
+            cmd.complete();
+            _unfinishedCommandsQueue.poll();
+        }
+        while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
+        {
+            cmd = _unfinishedCommandsQueue.poll();
+            cmd.awaitReadyForCompletion();
+            cmd.complete();
+        }
+    }
+
+
+    public void awaitCommandCompletion()
+    {
+        AsyncCommand cmd;
+        while((cmd = _unfinishedCommandsQueue.poll()) != null)
+        {
+            cmd.awaitReadyForCompletion();
+            cmd.complete();
+        }
+    }
+
+
+    public Object getAsyncCommandMark()
+    {
+        return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
+    }
+
+    public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+    {
+        _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
+    }
+
+    private static class AsyncCommand
+    {
+        private final MessageStore.StoreFuture _future;
+        private ServerTransaction.Action _action;
+
+        public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+        {
+            _future = future;
+            _action = action;
+        }
+
+        void awaitReadyForCompletion()
+        {
+            _future.waitForCompletion();
+        }
+
+        void complete()
+        {
+            if(!_future.isComplete())
+            {
+                _future.waitForCompletion();
+            }
+            _action.postCommit();
+            _action = null;
+        }
+
+        boolean isReadyForCompletion()
+        {
+            return _future.isComplete();
+        }
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Thu Mar  1 15:50:27 2012
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.transport;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +54,7 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.*;
 
@@ -81,9 +81,22 @@ public class ServerSessionDelegate exten
 
             if(!session.isClosing())
             {
-                super.command(session, method);
+                Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark();
+                super.command(session, method, false);
+                Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
+                if(newOutstanding == null || newOutstanding == asyncCommandMark)
+                {
+                    session.processed(method);    
+                }
+                
+                if(newOutstanding != null)
+                {
+                    ((ServerSession)session).completeAsyncCommands();
+                }
+
                 if (method.isSync())
                 {
+                    ((ServerSession)session).awaitCommandCompletion();
                     session.flushProcessed();
                 }
             }
@@ -98,7 +111,13 @@ public class ServerSessionDelegate exten
     @Override
     public void messageAccept(Session session, MessageAccept method)
     {
-        ((ServerSession)session).accept(method.getTransfers());
+        final ServerSession serverSession = (ServerSession) session;
+        serverSession.accept(method.getTransfers());
+        if(!serverSession.isTransactional())
+        {
+            serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE,
+                                       new CommandProcessedAction(serverSession, method));
+        }
     }
 
     @Override
@@ -252,7 +271,7 @@ public class ServerSessionDelegate exten
     }
 
     @Override
-    public void messageTransfer(Session ssn, MessageTransfer xfr)
+    public void messageTransfer(Session ssn, final MessageTransfer xfr)
     {
         final Exchange exchange = getExchangeForMessage(ssn, xfr);
 
@@ -294,12 +313,13 @@ public class ServerSessionDelegate exten
             exchangeInUse = exchange;
         }
 
+        final ServerSession serverSession = (ServerSession) ssn;
         if(!queues.isEmpty())
         {
             final MessageStore store = getVirtualHost(ssn).getMessageStore();
             final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
-            MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
-            ((ServerSession) ssn).enqueue(message, queues);
+            MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
+            serverSession.enqueue(message, queues);
             storeMessage.flushToStore();
         }
         else
@@ -313,13 +333,19 @@ public class ServerSessionDelegate exten
             }
             else
             {
-                ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
+                serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
             }
         }
 
 
-
-        ssn.processed(xfr);
+        if(serverSession.isTransactional())
+        {
+            serverSession.processed(xfr);
+        }
+        else
+        {
+            serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
+        }
     }
 
     private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
@@ -404,6 +430,13 @@ public class ServerSessionDelegate exten
 
 
     @Override
+    public void executionSync(final Session ssn, final ExecutionSync sync)
+    {
+        ((ServerSession)ssn).awaitCommandCompletion();
+        super.executionSync(ssn, sync);
+    }
+
+    @Override
     public void exchangeDeclare(Session session, ExchangeDeclare method)
     {
         String exchangeName = method.getExchange();
@@ -1269,4 +1302,25 @@ public class ServerSessionDelegate exten
         final ServerConnection scon = (ServerConnection) session.getConnection();
         SecurityManager.setThreadSubject(scon.getAuthorizedSubject());
     }
+
+    private static class CommandProcessedAction implements ServerTransaction.Action
+    {
+        private final ServerSession _serverSession;
+        private final Method _method;
+
+        public CommandProcessedAction(final ServerSession serverSession, final Method xfr)
+        {
+            _serverSession = serverSession;
+            _method = xfr;
+        }
+
+        public void postCommit()
+        {
+            _serverSession.processed(_method);
+        }
+
+        public void onRollback()
+        {
+        }
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Thu Mar  1 15:50:27 2012
@@ -242,6 +242,11 @@ public class AutoCommitTransaction imple
     {
     }
 
+    public boolean isTransactional()
+    {
+        return false;
+    }
+
     private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
     {
         if (txn != null)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Thu Mar  1 15:50:27 2012
@@ -309,7 +309,12 @@ public class LocalTransaction implements
     private void resetDetails()
     {
         _transaction = null;
-	_postTransactionActions.clear();
+	    _postTransactionActions.clear();
         _txnStartTime = 0L;
     }
+
+    public boolean isTransactional()
+    {
+        return true;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Thu Mar  1 15:50:27 2012
@@ -41,6 +41,8 @@ import org.apache.qpid.server.queue.Queu
  */
 public interface ServerTransaction
 {
+
+
     /**
      * Represents an action to be performed on transaction commit or rollback
      */
@@ -108,4 +110,6 @@ public interface ServerTransaction
      * be executed immediately after the underlying transaction has rolled-back. 
      */
     void rollback();
+
+    boolean isTransactional();
 }

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -1,3 +1,3 @@
 /qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:930288
 /qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java Thu Mar  1 15:50:27 2012
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Thu Mar  1 15:50:27 2012
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.AMQMessage;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java Thu Mar  1 15:50:27 2012
@@ -40,7 +40,12 @@ import org.apache.qpid.url.BindingURL;
  * to support any destination defined in AMQP 0-10 spec.
  */
 public class AMQAnyDestination extends AMQDestination implements Queue, Topic
-{    
+{
+    protected AMQAnyDestination()
+    {
+        super();
+    }
+
     public AMQAnyDestination(BindingURL binding)
     {
         super(binding);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Mar  1 15:50:27 2012
@@ -211,7 +211,7 @@ public class AMQConnectionDelegate_0_10 
                         + " port: " + brokerDetail.getPort() + " vhost: "
                         + _conn.getVirtualHost() + " username: "
                         + _conn.getUsername() + " password: "
-                        + _conn.getPassword());
+                        + "********");
             }
 
             ConnectionSettings conSettings = retriveConnectionSettings(brokerDetail);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Mar  1 15:50:27 2012
@@ -151,6 +151,10 @@ public abstract class AMQDestination imp
         return defaultDestSyntax;
     }
 
+    protected AMQDestination()
+    {  
+    }
+
     protected AMQDestination(Address address) throws Exception
     {
         this._address = address;
@@ -186,6 +190,11 @@ public abstract class AMQDestination imp
     
     protected AMQDestination(String str) throws URISyntaxException
     {
+        parseDestinationString(str);
+    }
+
+    protected void parseDestinationString(String str) throws URISyntaxException
+    {
         _destSyntax = getDestType(str);
         str = stripSyntaxPrefix(str);
         if (_destSyntax == DestSyntax.BURL)
@@ -305,6 +314,16 @@ public abstract class AMQDestination imp
         }
     }
 
+    public void setDestinationString(String str) throws Exception
+    {
+        parseDestinationString(str);
+    }
+
+    public String getDestinationString()
+    {
+        return toString();
+    }
+
     public DestSyntax getDestSyntax() 
     {
         return _destSyntax;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java Thu Mar  1 15:50:27 2012
@@ -30,6 +30,10 @@ import org.apache.qpid.url.BindingURL;
 
 public class AMQQueue extends AMQDestination implements Queue
 {
+    protected AMQQueue()
+    {
+        super();
+    }
 
     public AMQQueue(String address) throws URISyntaxException
     {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Mar  1 15:50:27 2012
@@ -371,7 +371,7 @@ public abstract class AMQSession<C exten
      * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
      * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
      */
-    protected volatile boolean _usingDispatcherForCleanup;
+    private volatile boolean _usingDispatcherForCleanup;
 
     /** Used to indicates that the connection to which this session belongs, has been stopped. */
     private boolean _connectionStopped;
@@ -3570,3 +3570,4 @@ public abstract class AMQSession<C exten
     }
 
 }
+

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Mar  1 15:50:27 2012
@@ -795,43 +795,11 @@ public class AMQSession_0_10 extends AMQ
     {
         if (suspend)
         {
-                synchronized (getMessageDeliveryLock())
-                {
-                    for (BasicMessageConsumer consumer : _consumers.values())
-	            {
-	                getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
-	                                             Option.UNRELIABLE);
-	                sync();
-	                List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
-	                _prefetchedMessageTags.addAll(tags);
-	            }
-                }
-
-                _usingDispatcherForCleanup = true;
-                syncDispatchQueue();
-                _usingDispatcherForCleanup = false;
-
-                RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
-		RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
-		RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
-					+ prefetched.size());
-
-		for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
-		{
-			Range range = deliveredIter.next();
-			all.add(range);
-		}
-
-		for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
-		{
-			Range range = prefetchedIter.next();
-			all.add(range);
-		}
-
-		flushProcessed(all, false);
-		getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
-		getQpidSession().messageRelease(prefetched);
-		sync();
+            for (BasicMessageConsumer consumer : _consumers.values())
+            {
+                getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+                                             Option.UNRELIABLE);
+            }
         }
         else
         {
@@ -1387,3 +1355,4 @@ public class AMQSession_0_10 extends AMQ
         getQpidSession().sync();
     }
 }
+

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Thu Mar  1 15:50:27 2012
@@ -43,6 +43,11 @@ public class AMQTopic extends AMQDestina
         super(address);
     }
     
+    protected AMQTopic()
+    {
+        super();
+    }
+
     /**
      * Constructor for use in creating a topic using a BindingURL.
      *

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/test/example_build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/test/example_build.xml?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/test/example_build.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/test/example_build.xml Thu Mar  1 15:50:27 2012
@@ -73,7 +73,7 @@
         </javac>
 
         <copy todir="${example.classes}">
-            <!-- copy any non java src files into the build tree, e.g. log4j.properties -->
+            <!-- copy any non java src files into the build tree, e.g. properties files -->
             <fileset dir="${example.src}">
                 <exclude name="**/*.java"/>
                 <exclude name="**/package.html"/>

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/messaging/Address.java Thu Mar  1 15:50:27 2012
@@ -35,41 +35,42 @@ import static org.apache.qpid.messaging.
 public class Address
 {
 
+    private String _name;
+    private String _subject;
+    private Map _options;
+    private final String _myToString;
+
     public static Address parse(String address)
     {
         return new AddressParser(address).parse();
     }
 
-    private String name;
-    private String subject;
-    private Map options;
-
     public Address(String name, String subject, Map options)
     {
-        this.name = name;
-        this.subject = subject;
-        this.options = options;
+        this._name = name;
+        this._subject = subject;
+        this._options = options;
+        this._myToString = String.format("%s/%s; %s", pprint(_name), pprint(_subject), pprint(_options));
     }
 
     public String getName()
     {
-        return name;
+        return _name;
     }
 
     public String getSubject()
     {
-        return subject;
+        return _subject;
     }
 
     public Map getOptions()
     {
-        return options;
+        return _options;
     }
 
     public String toString()
     {
-        return String.format("%s/%s; %s", pprint(name), pprint(subject),
-                             pprint(options));
+        return _myToString;
     }
 
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Thu Mar  1 15:50:27 2012
@@ -33,7 +33,6 @@ import static org.apache.qpid.transport.
 
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.transport.network.Frame;
-import static org.apache.qpid.transport.util.Functions.mod;
 import org.apache.qpid.transport.util.Logger;
 import org.apache.qpid.transport.util.Waiter;
 import static org.apache.qpid.util.Serial.ge;
@@ -44,11 +43,9 @@ import static org.apache.qpid.util.Seria
 import static org.apache.qpid.util.Strings.toUTF8;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,7 +59,7 @@ import java.util.concurrent.atomic.Atomi
 public class Session extends SessionInvoker
 {
     private static final Logger log = Logger.get(Session.class);
-    
+
     public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
 
     static class DefaultSessionListener implements SessionListener
@@ -113,7 +110,9 @@ public class Session extends SessionInvo
 
     // outgoing command count
     private int commandsOut = 0;
-    private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)];
+    private final int commandLimit = Integer.getInteger("qpid.session.command_limit", 64 * 1024);
+    private Map<Integer,Method> commands = new HashMap<Integer, Method>();
+    private final Object commandsLock = new Object();
     private int commandBytes = 0;
     private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024);
     private int maxComplete = commandsOut - 1;
@@ -196,7 +195,7 @@ public class Session extends SessionInvo
 
     public void setAutoSync(boolean value)
     {
-        synchronized (commands)
+        synchronized (commandsLock)
         {
             this.autoSync = value;
         }
@@ -204,10 +203,10 @@ public class Session extends SessionInvo
 
     protected void setState(State state)
     {
-        synchronized (commands)
+        synchronized (commandsLock)
         {
             this.state = state;
-            commands.notifyAll();
+            commandsLock.notifyAll();
         }
     }
 
@@ -276,13 +275,13 @@ public class Session extends SessionInvo
     void resume()
     {
         _failoverRequired.set(false);
-        synchronized (commands)
+        synchronized (commandsLock)
         {
             attach();
 
             for (int i = maxComplete + 1; lt(i, commandsOut); i++)
             {
-                Method m = commands[mod(i, commands.length)];
+                Method m = getCommand(i);
                 if (m == null)
                 {
                     m = new ExecutionSync();
@@ -337,11 +336,27 @@ public class Session extends SessionInvo
         }
     }
 
+    private Method getCommand(int i)
+    {
+        return commands.get(i);
+    }
+
+    private void setCommand(int commandId, Method command)
+    {
+        commands.put(commandId, command);
+    }
+
+    private Method removeCommand(int id)
+    {
+        return commands.remove(id);
+    }
+
     void dump()
     {
-        synchronized (commands)
+        synchronized (commandsLock)
         {
-            for (Method m : commands)
+            TreeMap<Integer, Method> ordered = new TreeMap<Integer, Method>(commands);
+            for (Method m : ordered.values())
             {
                 if (m != null)
                 {
@@ -484,7 +499,7 @@ public class Session extends SessionInvo
             copy = processed.copy();
         }
 
-        synchronized (commands)
+        synchronized (commandsLock)
         {
             if (state == DETACHED || state == CLOSING || state == CLOSED)
             {
@@ -539,18 +554,16 @@ public class Session extends SessionInvo
         {
             log.debug("%s complete(%d, %d)", this, lower, upper);
         }
-        synchronized (commands)
+        synchronized (commandsLock)
         {
             int old = maxComplete;
             for (int id = max(maxComplete, lower); le(id, upper); id++)
             {
-                int idx = mod(id, commands.length);
-                Method m = commands[idx];
+                Method m = removeCommand(id);
                 if (m != null)
                 {
                     commandBytes -= m.getBodySize();
                     m.complete();
-                    commands[idx] = null;
                 }
             }
             if (le(lower, maxComplete + 1))
@@ -563,7 +576,7 @@ public class Session extends SessionInvo
                 log.debug("%s   commands remaining: %s", this, commandsOut - maxComplete);
             }
 
-            commands.notifyAll();
+            commandsLock.notifyAll();
             return gt(maxComplete, old);
         }
     }
@@ -596,7 +609,7 @@ public class Session extends SessionInvo
 
     protected boolean isCommandsFull(int id)
     {
-        return id - maxComplete >= commands.length;
+        return id - maxComplete >= commandLimit;
     }
 
     public void invoke(Method m)
@@ -613,7 +626,7 @@ public class Session extends SessionInvo
                 acquireCredit();
             }
             
-            synchronized (commands)
+            synchronized (commandsLock)
             {
                 if (state == DETACHED && m.isUnreliable())
                 {
@@ -629,7 +642,7 @@ public class Session extends SessionInvo
                     Thread current = Thread.currentThread();
                     if (!current.equals(resumer) )
                     {
-                        Waiter w = new Waiter(commands, timeout);
+                        Waiter w = new Waiter(commandsLock, timeout);
                         while (w.hasTime() && (state != OPEN && state != CLOSED))
                         {
                             checkFailoverRequired("Command was interrupted because of failover, before being sent");
@@ -678,7 +691,7 @@ public class Session extends SessionInvo
 
                 if (isFull(next))
                 {
-                    Waiter w = new Waiter(commands, timeout);
+                    Waiter w = new Waiter(commandsLock, timeout);
                     while (w.hasTime() && isFull(next) && state != CLOSED)
                     {
                         if (state == OPEN || state == RESUMING)
@@ -735,7 +748,7 @@ public class Session extends SessionInvo
                 
                 if ((replayTransfer) || m.hasCompletionListener())
                 {
-                    commands[mod(next, commands.length)] = m;
+                    setCommand(next, m);
                     commandBytes += m.getBodySize();
                 }
                 if (autoSync)
@@ -817,7 +830,7 @@ public class Session extends SessionInvo
     public void sync(long timeout)
     {
         log.debug("%s sync()", this);
-        synchronized (commands)
+        synchronized (commandsLock)
         {
             int point = commandsOut - 1;
 
@@ -826,19 +839,13 @@ public class Session extends SessionInvo
                 executionSync(SYNC);
             }
 
-            Waiter w = new Waiter(commands, timeout);
+            Waiter w = new Waiter(commandsLock, timeout);
             while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
             {
                 checkFailoverRequired("Session sync was interrupted by failover.");                               
                 if(log.isDebugEnabled())
                 {
-                    List<Method> waitingFor =
-                            Arrays.asList(commands)
-                                  .subList(mod(maxComplete,commands.length),
-                                           mod(commandsOut-1, commands.length) < mod(maxComplete, commands.length)
-                                             ? commands.length-1
-                                             : mod(commandsOut-1, commands.length));
-                    log.debug("%s   waiting for[%d]: %d, %s", this, point, maxComplete, waitingFor);
+                    log.debug("%s   waiting for[%d]: %d, %s", this, point, maxComplete, commands);
                 }
                 w.await();
             }
@@ -909,7 +916,7 @@ public class Session extends SessionInvo
 
     protected <T> Future<T> invoke(Method m, Class<T> klass)
     {
-        synchronized (commands)
+        synchronized (commandsLock)
         {
             int command = commandsOut;
             ResultFuture<T> future = new ResultFuture<T>(klass);
@@ -1019,7 +1026,7 @@ public class Session extends SessionInvo
         {
             log.debug("Closing [%s] in state [%s]", this, state);
         }
-        synchronized (commands)
+        synchronized (commandsLock)
         {
             switch(state)
             {
@@ -1043,7 +1050,7 @@ public class Session extends SessionInvo
 
     protected void awaitClose() 
     {
-        Waiter w = new Waiter(commands, timeout);
+        Waiter w = new Waiter(commandsLock, timeout);
         while (w.hasTime() && state != CLOSED)
         {
             checkFailoverRequired("close() was interrupted by failover.");
@@ -1063,7 +1070,7 @@ public class Session extends SessionInvo
 
     public void closed()
     {
-        synchronized (commands)
+        synchronized (commandsLock)
         {
             if (closing || getException() != null)
             {
@@ -1074,7 +1081,7 @@ public class Session extends SessionInvo
                 state = DETACHED;
             }
 
-            commands.notifyAll();
+            commandsLock.notifyAll();
 
             synchronized (results)
             {
@@ -1171,9 +1178,9 @@ public class Session extends SessionInvo
         //prevent them waiting for timeout for 60 seconds
         //and possibly preventing failover proceeding
         _failoverRequired.set(true);
-        synchronized (commands)
+        synchronized (commandsLock)
         {
-            commands.notifyAll();
+            commandsLock.notifyAll();
         }
         synchronized (results)
         {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Thu Mar  1 15:50:27 2012
@@ -45,10 +45,15 @@ public class SessionDelegate
         method.dispatch(ssn, this);
     }
 
-    public void command(Session ssn, Method method) {
+    public void command(Session ssn, Method method)
+    {
+        command(ssn, method, !method.hasPayload());
+    }
+    public void command(Session ssn, Method method, boolean processed) 
+    {
         ssn.identify(method);
         method.dispatch(ssn, this);
-        if (!method.hasPayload())
+        if (processed)
         {
             ssn.processed(method);
         }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/jca/src/main/java/org/apache/qpid/ra/ConnectionFactoryProperties.java Thu Mar  1 15:50:27 2012
@@ -87,8 +87,9 @@ public class ConnectionFactoryProperties
    {
       if (_log.isTraceEnabled())
       {
-         _log.trace("setConnectionURL(" + connectionURL + ")");
+         _log.trace("setConnectionURL(" + Util.maskUrlForLog(connectionURL) + ")");
       }
+
       _hasBeenUpdated = true;
       this._connectionURL = connectionURL;
    }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/jca/src/main/java/org/apache/qpid/ra/QpidResourceAdapter.java Thu Mar  1 15:50:27 2012
@@ -425,11 +425,6 @@ public class QpidResourceAdapter impleme
     */
    public void setConnectionURL(final String connectionURL)
    {
-      if (_log.isTraceEnabled())
-      {
-         _log.trace("setConnectionURL(" + connectionURL + ")");
-      }
-
       _raProperties.setConnectionURL(connectionURL);
    }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/jca/src/main/java/org/apache/qpid/ra/Util.java Thu Mar  1 15:50:27 2012
@@ -34,8 +34,10 @@ import javax.naming.Reference;
 import javax.naming.Referenceable;
 import javax.transaction.TransactionManager;
 
+import org.apache.qpid.client.AMQConnectionURL;
 import org.apache.qpid.ra.admin.QpidQueue;
 import org.apache.qpid.ra.admin.QpidTopic;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -181,4 +183,19 @@ public class Util
    {
       return (object == null ? "null" : object.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(object))) ;
    }
+
+
+   public static String maskUrlForLog(final String url)
+   {
+       String results = null;
+
+       try
+       {
+           results = new AMQConnectionURL(url).toString();
+       }
+       catch(Exception ignore){}
+
+       return (results == null) ? url : results;
+   }
+
 }

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -2,4 +2,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -2,4 +2,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/management/LoggingManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747869,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -2,4 +2,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -2,4 +2,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -2,4 +2,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanAttribute.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanConstructor.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanDescription.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperation.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperationParameter.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -2,4 +2,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/management/eclipse-plugin/src:788755
 /qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/eclipse-plugin/src:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 15:50:27 2012
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1230000
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/module.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/module.xml?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/module.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/module.xml Thu Mar  1 15:50:27 2012
@@ -264,7 +264,7 @@
       <classpath refid="module.class.path"/>
     </javac>
 
-    <!-- copy any non java src files into the build tree, e.g. log4j.properties -->
+    <!-- copy any non java src files into the build tree, e.g. properties files -->
     <copy todir="${module.classes}" verbose="true">
       <fileset dir="${module.src}">
         <exclude name="**/*.java"/>
@@ -285,7 +285,7 @@
       <classpath refid="module.test.path"/>
     </javac>
 
-    <!-- copy any non java src files into the build tree, e.g. log4j.properties -->
+    <!-- copy any non java src files into the build tree, e.g. properties files -->
     <copy todir="${module.test.classes}" verbose="true">
       <fileset dir="${module.test.src}">
         <exclude name="**/*.java"/>

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java Thu Mar  1 15:50:27 2012
@@ -1,24 +1,3 @@
-package org.apache.qpid.client.prefetch;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
@@ -39,6 +18,26 @@ import org.slf4j.LoggerFactory;
 * under the License.
 *
 */
+package org.apache.qpid.client.prefetch;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
 public class PrefetchBehaviourTest extends QpidBrokerTestCase
 {
     protected static final Logger _logger = LoggerFactory.getLogger(PrefetchBehaviourTest.class);
@@ -132,44 +131,66 @@ public class PrefetchBehaviourTest exten
         //wait for the other consumer to finish to ensure it completes ok
         _logger.debug("waiting for async consumer to complete");
         assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS));
-        assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get());
+        assertFalse("Unexpected exception during async message processing",_exceptionCaught.get());
     }
 
     /**
-     * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer.
-     * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them.
-     *                Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue.
-     *                Try to receive all 10 messages.
+     * This test was originally known as AMQConnectionTest#testPrefetchSystemProperty.
+     *
      */
-    public void testConnectionStop() throws Exception
+    public void testMessagesAreDistributedBetweenConsumersWithLowPrefetch() throws Exception
     {
-        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10");
-        Connection con = getConnection();
-        con.start();
-        Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}");
+        Queue queue = getTestQueue();
+
+        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
 
-        MessageProducer prod = ssn.createProducer(queue);
-        for (int i=0; i<10;i++)
+        Connection connection = getConnection();
+        connection.start();
+        // Create Consumer A
+        Session consSessA = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumerA = consSessA.createConsumer(queue);
+
+        // ensure message delivery to consumer A is started (required for 0-8..0-9-1)
+        final Message msg = consumerA.receiveNoWait();
+        assertNull(msg);
+
+        Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        sendMessage(producerSession, queue, 3);
+
+        // Create Consumer B
+        MessageConsumer consumerB = null;
+        if (isBroker010())
         {
-           prod.send(ssn.createTextMessage("Msg" + i));
+            // 0-10 prefetch is per consumer so we create Consumer B on the same session as Consumer A
+            consumerB = consSessA.createConsumer(queue);
         }
+        else
+        {
+            // 0-8..0-9-1 prefetch is per session so we create Consumer B on a separate session
+            Session consSessB = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            consumerB = consSessB.createConsumer(queue);
+        }
+
+        // As message delivery to consumer A is already started, the first two messages should
+        // now be with consumer A.  The last message will still be on the Broker as consumer A's
+        // credit is exhausted and message delivery for consumer B is not yet running.
+
+        // As described by QPID-3747, for 0-10 we *must* check Consumer B before Consumer A.
+        // If we were to reverse the order, the SessionComplete will restore Consumer A's credit,
+        // and the third message could be delivered to either Consumer A or Consumer B.
+
+        // Check that consumer B gets the last (third) message.
+        final Message msgConsumerB = consumerB.receive(1500);
+        assertNotNull("Consumer B should have received a message", msgConsumerB);
+        assertEquals("Consumer B received message with unexpected index", 2, msgConsumerB.getIntProperty(INDEX));
 
-        MessageConsumer consumer = ssn.createConsumer(queue);
-        // This is to ensure we get the first client to prefetch.
-        Message msg = consumer.receive(1000);
-        assertNotNull("The first consumer should get one message",msg);
-        con.stop();
-
-        Connection con2 = getConnection();
-        con2.start();
-        Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer2 = ssn2.createConsumer(queue);
-        for (int i=0; i<9;i++)
+        // Now check that consumer A has indeed got the first two messages.
+        for (int i = 0; i < 2; i++)
         {
-           TextMessage m = (TextMessage)consumer2.receive(1000);
-           assertNotNull("The second consumer should get 9 messages, but received only " + i,m);
+            final Message msgConsumerA = consumerA.receive(1500);
+            assertNotNull("Consumer A should have received a message " + i, msgConsumerA);
+            assertEquals("Consumer A received message with unexpected index", i, msgConsumerA.getIntProperty(INDEX));
         }
     }
-
 }
+

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java Thu Mar  1 15:50:27 2012
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
@@ -37,9 +38,13 @@ import org.apache.qpid.client.AMQSession
 import org.apache.qpid.management.common.mbeans.ManagedConnection;
 import org.apache.qpid.test.utils.JMXTestUtils;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ManagedConnectionMBeanTest extends QpidBrokerTestCase
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ManagedConnectionMBeanTest.class);
+
     /**
      * JMX helper.
      */
@@ -120,9 +125,10 @@ public class ManagedConnectionMBeanTest 
 
         _connection.close();
 
+        LOGGER.debug("Querying JMX for number of open connections");
         connections = _jmxUtils.getManagedConnections("test");
         assertNotNull("Connection MBean is not found", connections);
-        assertEquals("Unexpected number of connection mbeans", 0, connections.size());
+        assertEquals("Unexpected number of connection mbeans after connection closed", 0, connections.size());
     }
 
     public void testCommit() throws Exception
@@ -218,13 +224,13 @@ public class ManagedConnectionMBeanTest 
         mBean.rollbackTransactions(channelId.intValue());
 
         Message m = consumer.receive(1000l);
-        assertNull("Unexpected message received", m);
+        assertNull("Unexpected message received: " + String.valueOf(m), m);
 
         producerSession.commit();
 
         _connection.start();
         m = consumer.receive(1000l);
-        assertNull("Unexpected message received", m);
+        assertNull("Unexpected message received after commit " + String.valueOf(m), m);
     }
 
     public void testAuthorisedId() throws Exception

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java Thu Mar  1 15:50:27 2012
@@ -31,6 +31,7 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQSession_0_10;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQShortString;
@@ -195,7 +196,7 @@ public class ExchangeLoggingTest extends
 
             ExchangeDeleteBody body = registry.createExchangeDeleteBody(0, new AMQShortString(_name), false, true);
 
-            AMQFrame exchangeDeclare = body.generateFrame(0);
+            AMQFrame exchangeDeclare = body.generateFrame(((AMQSession)_session).getChannelId());
 
             ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeleteOkBody.class);
         }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java?rev=1295635&r1=1295634&r2=1295635&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java Thu Mar  1 15:50:27 2012
@@ -147,7 +147,6 @@ public class MessageGroupQueueTest exten
 
         assertNotNull("Consumer 2 should have received first message", cs2Received);
 
-        cs1Received.acknowledge();
         cs2Received.acknowledge();
 
         Message cs2Received2 = consumer2.receive(1000);
@@ -156,6 +155,7 @@ public class MessageGroupQueueTest exten
         assertEquals("Differing groups", cs2Received2.getStringProperty("group"),
                      cs2Received.getStringProperty("group"));
 
+        cs1Received.acknowledge();
         Message cs1Received2 = consumer1.receive(1000);
 
         assertNotNull("Consumer 1 should have received second message", cs1Received2);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org