You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2012/11/30 12:55:35 UTC

svn commit: r1415591 [2/3] - in /qpid/branches/java-broker-config-qpid-4390/qpid/java: ./ amqp-1-0-client-jms/ amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ amqp-1-0-client/ amqp-1-0-common/ bdbstore/bin/ bdbstore/jmx/ bdbstore/j...

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Fri Nov 30 11:55:29 2012
@@ -43,6 +43,8 @@ public class ProtocolEngine_0_10  extend
     private ServerConnection _connection;
 
     private long _createTime = System.currentTimeMillis();
+    private long _lastReadTime;
+    private long _lastWriteTime;
 
     public ProtocolEngine_0_10(ServerConnection conn,
                                NetworkConnection network)
@@ -68,13 +70,61 @@ public class ProtocolEngine_0_10  extend
     {
         _network = network;
 
-        _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
+        _connection.setNetworkConnection(network);
+        _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
         _connection.setPeerPrincipal(_network.getPeerPrincipal());
         // FIXME Two log messages to maintain compatibility with earlier protocol versions
         _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false));
         _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false));
     }
 
+    private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
+    {
+        return new Sender<ByteBuffer>()
+        {
+            @Override
+            public void setIdleTimeout(int i)
+            {
+                sender.setIdleTimeout(i);
+
+            }
+
+            @Override
+            public void send(ByteBuffer msg)
+            {
+                _lastWriteTime = System.currentTimeMillis();
+                sender.send(msg);
+
+            }
+
+            @Override
+            public void flush()
+            {
+                sender.flush();
+
+            }
+
+            @Override
+            public void close()
+            {
+                sender.close();
+
+            }
+        };
+    }
+
+    @Override
+    public long getLastReadTime()
+    {
+        return _lastReadTime;
+    }
+
+    @Override
+    public long getLastWriteTime()
+    {
+        return _lastWriteTime;
+    }
+
     public SocketAddress getRemoteAddress()
     {
         return _network.getRemoteAddress();
@@ -87,6 +137,7 @@ public class ProtocolEngine_0_10  extend
 
     public void received(final ByteBuffer buf)
     {
+        _lastReadTime = System.currentTimeMillis();
         super.received(buf);
         _connection.receivedComplete();
     }
@@ -103,7 +154,7 @@ public class ProtocolEngine_0_10  extend
 
     public void writerIdle()
     {
-        //Todo
+        _connection.doHeartbeat();
     }
 
     public void readerIdle()

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java Fri Nov 30 11:55:29 2012
@@ -53,6 +53,8 @@ public class ProtocolEngine_1_0_0 implem
     //private NetworkConnection _networkDriver;
     private long _readBytes;
     private long _writtenBytes;
+    private long _lastReadTime;
+    private long _lastWriteTime;
     private final IApplicationRegistry _appRegistry;
     private long _createTime = System.currentTimeMillis();
     private ConnectionEndpoint _conn;
@@ -97,10 +99,14 @@ public class ProtocolEngine_1_0_0 implem
 
 
 
-    public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry, long id)
+    public ProtocolEngine_1_0_0(final NetworkConnection networkDriver, final IApplicationRegistry appRegistry, long id)
     {
         _appRegistry = appRegistry;
         _connectionId = id;
+        if(networkDriver != null)
+        {
+            setNetworkConnection(networkDriver, networkDriver.getSender());
+        }
     }
 
 
@@ -178,6 +184,7 @@ public class ProtocolEngine_1_0_0 implem
 
     public synchronized void received(ByteBuffer msg)
     {
+        _lastReadTime = System.currentTimeMillis();
         if(RAW_LOGGER.isLoggable(Level.FINE))
         {
             ByteBuffer dup = msg.duplicate();
@@ -320,6 +327,7 @@ public class ProtocolEngine_1_0_0 implem
         synchronized(_sendLock)
         {
 
+            _lastWriteTime = System.currentTimeMillis();
             if(FRAME_LOGGER.isLoggable(Level.FINE))
             {
                 FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
@@ -374,4 +382,13 @@ public class ProtocolEngine_1_0_0 implem
         return _connectionId;
     }
 
+    public long getLastReadTime()
+    {
+        return _lastReadTime;
+    }
+
+    public long getLastWriteTime()
+    {
+        return _lastWriteTime;
+    }
 }

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java:r1411034-1415148

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java Fri Nov 30 11:55:29 2012
@@ -51,6 +51,9 @@ public class ProtocolEngine_1_0_0_SASL i
 {
        private long _readBytes;
        private long _writtenBytes;
+
+       private long _lastReadTime;
+       private long _lastWriteTime;
        private final IApplicationRegistry _appRegistry;
        private long _createTime = System.currentTimeMillis();
        private ConnectionEndpoint _conn;
@@ -221,6 +224,7 @@ public class ProtocolEngine_1_0_0_SASL i
 
     public synchronized void received(ByteBuffer msg)
     {
+        _lastReadTime = System.currentTimeMillis();
         if(RAW_LOGGER.isLoggable(Level.FINE))
         {
             ByteBuffer dup = msg.duplicate();
@@ -363,7 +367,7 @@ public class ProtocolEngine_1_0_0_SASL i
 
          synchronized(_sendLock)
          {
-
+             _lastWriteTime = System.currentTimeMillis();
              if(FRAME_LOGGER.isLoggable(Level.FINE))
              {
                  FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
@@ -424,4 +428,13 @@ public class ProtocolEngine_1_0_0_SASL i
          return _connectionId;
      }
 
+    public long getLastReadTime()
+    {
+        return _lastReadTime;
+    }
+
+    public long getLastWriteTime()
+    {
+        return _lastWriteTime;
+    }
 }

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:r1411034-1415148

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Fri Nov 30 11:55:29 2012
@@ -41,6 +41,14 @@ import org.apache.qpid.server.virtualhos
 
 public class AMQQueueFactory
 {
+    public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
+    public static final String X_QPID_CAPACITY = "x-qpid-capacity";
+    public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
+    public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
+    public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
+    public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
+    public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
+
     public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
     public static final String X_QPID_DESCRIPTION = "x-qpid-description";
     public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
@@ -119,42 +127,49 @@ public class AMQQueueFactory
     }
 
     private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
-            new QueueLongProperty("x-qpid-maximum-message-age")
+            new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_AGE)
             {
                 public void setPropertyValue(AMQQueue queue, long value)
                 {
                     queue.setMaximumMessageAge(value);
                 }
             },
-            new QueueLongProperty("x-qpid-maximum-message-size")
+            new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_SIZE)
             {
                 public void setPropertyValue(AMQQueue queue, long value)
                 {
                     queue.setMaximumMessageSize(value);
                 }
             },
-            new QueueLongProperty("x-qpid-maximum-message-count")
+            new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_COUNT)
             {
                 public void setPropertyValue(AMQQueue queue, long value)
                 {
                     queue.setMaximumMessageCount(value);
                 }
             },
-            new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+            new QueueLongProperty(X_QPID_MAXIMUM_QUEUE_DEPTH)
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setMaximumQueueDepth(value);
+                }
+            },
+            new QueueLongProperty(X_QPID_MINIMUM_ALERT_REPEAT_GAP)
             {
                 public void setPropertyValue(AMQQueue queue, long value)
                 {
                     queue.setMinimumAlertRepeatGap(value);
                 }
             },
-            new QueueLongProperty("x-qpid-capacity")
+            new QueueLongProperty(X_QPID_CAPACITY)
             {
                 public void setPropertyValue(AMQQueue queue, long value)
                 {
                     queue.setCapacity(value);
                 }
             },
-            new QueueLongProperty("x-qpid-flow-resume-capacity")
+            new QueueLongProperty(X_QPID_FLOW_RESUME_CAPACITY)
             {
                 public void setPropertyValue(AMQQueue queue, long value)
                 {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManager.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManager.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManager.java Fri Nov 30 11:55:29 2012
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.security.Principal;
 import java.util.HashMap;
 import java.util.Hashtable;
+
+import javax.naming.AuthenticationException;
 import javax.naming.Context;
 import javax.naming.NamingEnumeration;
 import javax.naming.NamingException;
@@ -40,6 +42,7 @@ import javax.security.sasl.SaslException
 import javax.security.sasl.SaslServer;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.security.auth.sasl.plain.PlainPasswordCallback;
 
@@ -119,33 +122,74 @@ public class SimpleLDAPAuthenticationMan
     @Override
     public AuthenticationResult authenticate(String username, String password)
     {
-
         try
         {
-            return doLDAPNameAuthentication(getNameFromId(username), password);
+            AuthenticationResult result = doLDAPNameAuthentication(getNameFromId(username), password);
+            if(result.getStatus() == AuthenticationStatus.SUCCESS)
+            {
+                //Return a result based on the supplied username rather than the search name
+                return new AuthenticationResult(new UsernamePrincipal(username));
+            }
+            else
+            {
+                return result;
+            }
         }
         catch (NamingException e)
         {
-
             return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
-
         }
     }
 
-    private AuthenticationResult doLDAPNameAuthentication(String username, String password) throws NamingException
+    private AuthenticationResult doLDAPNameAuthentication(String name, String password)
     {
+        if(name == null)
+        {
+            //The search didn't return anything, class as not-authenticated before it NPEs below
+            return new AuthenticationResult(AuthenticationStatus.CONTINUE);
+        }
+
         Hashtable<Object,Object> env = new Hashtable<Object,Object>();
         env.put(Context.INITIAL_CONTEXT_FACTORY, _ldapContextFactory);
         env.put(Context.PROVIDER_URL, _providerAuthURL);
 
         env.put(Context.SECURITY_AUTHENTICATION, "simple");
 
-        env.put(Context.SECURITY_PRINCIPAL, username);
+        env.put(Context.SECURITY_PRINCIPAL, name);
         env.put(Context.SECURITY_CREDENTIALS, password);
-        DirContext ctx = new InitialDirContext(env);
-        ctx.close();
 
-        return new AuthenticationResult(new UsernamePrincipal(username));
+        DirContext ctx = null;
+        try
+        {
+            ctx = new InitialDirContext(env);
+
+            //Authentication succeeded
+            return new AuthenticationResult(new UsernamePrincipal(name));
+        }
+        catch(AuthenticationException ae)
+        {
+            //Authentication failed
+            return new AuthenticationResult(AuthenticationStatus.CONTINUE);
+        }
+        catch (NamingException e)
+        {
+            //Some other failure
+            return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
+        }
+        finally
+        {
+            if(ctx != null)
+            {
+                try
+                {
+                    ctx.close();
+                }
+                catch (Exception e)
+                {
+                    _logger.warn("Exception closing InitialDirContext", e);
+                }
+            }
+        }
     }
 
     @Override
@@ -190,19 +234,11 @@ public class SimpleLDAPAuthenticationMan
                     }
                     catch (NamingException e)
                     {
-                        _logger.info("SASL Authentication Error", e);
+                        _logger.warn("SASL Authentication Exception", e);
                     }
                     if(password != null)
                     {
-                        try
-                        {
-                            authenticated = doLDAPNameAuthentication(name, password);
-
-                        }
-                        catch (NamingException e)
-                        {
-                            authenticated = new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
-                        }
+                        authenticated = doLDAPNameAuthentication(name, password);
                     }
                 }
                 else if (callback instanceof PlainPasswordCallback)
@@ -210,17 +246,10 @@ public class SimpleLDAPAuthenticationMan
                     password = ((PlainPasswordCallback)callback).getPlainPassword();
                     if(name != null)
                     {
-                        try
-                        {
-                            authenticated = doLDAPNameAuthentication(name, password);
-                            if(authenticated.getStatus()== AuthenticationResult.AuthenticationStatus.SUCCESS)
-                            {
-                                ((PlainPasswordCallback)callback).setAuthenticated(true);
-                            }
-                        }
-                        catch (NamingException e)
+                        authenticated = doLDAPNameAuthentication(name, password);
+                        if(authenticated.getStatus()== AuthenticationResult.AuthenticationStatus.SUCCESS)
                         {
-                            authenticated = new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
+                            ((PlainPasswordCallback)callback).setAuthenticated(true);
                         }
                     }
                 }
@@ -242,7 +271,6 @@ public class SimpleLDAPAuthenticationMan
         env.put(Context.INITIAL_CONTEXT_FACTORY, _ldapContextFactory);
         env.put(Context.PROVIDER_URL, _providerSearchURL);
 
-
         env.put(Context.SECURITY_AUTHENTICATION, "none");
         DirContext ctx = null;
 
@@ -267,7 +295,14 @@ public class SimpleLDAPAuthenticationMan
         }
         finally
         {
-            ctx.close();
+            try
+            {
+                ctx.close();
+            }
+            catch (Exception e)
+            {
+                _logger.warn("Exception closing InitialDirContext", e);
+            }
         }
 
     }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Fri Nov 30 11:55:29 2012
@@ -48,6 +48,7 @@ import org.apache.qpid.transport.Executi
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolEvent;
 import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.network.NetworkConnection;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
@@ -68,6 +69,7 @@ public class ServerConnection extends Co
     private AtomicLong _lastIoTime = new AtomicLong();
     private boolean _blocking;
     private Principal _peerPrincipal;
+    private NetworkConnection _networkConnection;
 
     public ServerConnection(final long connectionId)
     {
@@ -490,4 +492,20 @@ public class ServerConnection extends Co
     {
         super.setLocalAddress(localAddress);
     }
+
+    public void setNetworkConnection(NetworkConnection network)
+    {
+        _networkConnection = network;
+    }
+
+    public NetworkConnection getNetworkConnection()
+    {
+        return _networkConnection;
+    }
+
+    public void doHeartbeat()
+    {
+        super.doHeartBeat();
+
+    }
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Fri Nov 30 11:55:29 2012
@@ -43,6 +43,8 @@ import org.apache.qpid.server.subscripti
 import org.apache.qpid.server.virtualhost.State;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.NetworkConnection;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -228,14 +230,18 @@ public class ServerConnectionDelegate ex
             return;
         }
 
-        setConnectionTuneOkChannelMax(sconn, okChannelMax);
-    }
+        if(ok.hasHeartbeat())
+        {
+            final int heartbeat = ok.getHeartbeat();
+            if(heartbeat > 0)
+            {
+                final NetworkConnection networkConnection = sconn.getNetworkConnection();
+                networkConnection.setMaxReadIdle(2 * heartbeat);
+                networkConnection.setMaxWriteIdle(heartbeat);
+            }
+        }
 
-    @Override
-    protected int getHeartbeatMax()
-    {
-        //TODO: implement broker support for actually sending heartbeats
-        return 0;
+        setConnectionTuneOkChannelMax(sconn, okChannelMax);
     }
 
     @Override

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:r1411034-1415148

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java Fri Nov 30 11:55:29 2012
@@ -20,11 +20,31 @@
 */
 package org.apache.qpid.server.protocol;
 
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
 import org.apache.commons.configuration.XMLConfiguration;
 
 import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.virtualhost.HouseKeepingTask;
+import org.apache.qpid.server.virtualhost.State;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.TestNetworkConnection;
 
@@ -42,6 +62,14 @@ public class MultiVersionProtocolEngineF
 
         _appRegistry = new TestApplicationRegistry(new XMLConfiguration());
         ApplicationRegistry.initialise(_appRegistry);
+        // AMQP 1-0 connection needs default vhost to be present
+        IApplicationRegistry registry = ApplicationRegistry.getInstance();
+        VirtualHostRegistry virtualHostRegistry = registry.getVirtualHostRegistry();
+        VirtualHostImpl vhostImpl = new VirtualHostImpl(virtualHostRegistry, registry, registry.getSecurityManager(),
+                new VirtualHostConfiguration("default",new XMLConfiguration(), registry.getBroker()));
+        virtualHostRegistry.registerVirtualHost(vhostImpl);
+        virtualHostRegistry.setDefaultVirtualHostName("default");
+
     }
 
     protected void tearDown()
@@ -161,6 +189,7 @@ public class MultiVersionProtocolEngineF
             assertEquals("ID was not as expected following receipt of the AMQP version header", expectedID, engine.getConnectionId());
 
             previousId = expectedID;
+            engine.closed();
         }
     }
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/client.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/client.bnd?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/client.bnd (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/client.bnd Fri Nov 30 11:55:29 2012
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-ver: 0.19.0
+ver: 0.21.0
 
 Bundle-SymbolicName: qpid-client
 Bundle-Version: ${ver}

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Nov 30 11:55:29 2012
@@ -1560,4 +1560,9 @@ public class AMQConnection extends Close
                          + localAddress + " to " + remoteAddress);
         }
     }
+
+    void setHeartbeatListener(HeartbeatListener listener)
+    {
+        _delegate.setHeartbeatListener(listener);
+    }
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Fri Nov 30 11:55:29 2012
@@ -78,4 +78,6 @@ public interface AMQConnectionDelegate
      * @return true if the feature is supported by the server
      */
     boolean isSupportedServerFeature(final String featureName);
+
+    void setHeartbeatListener(HeartbeatListener listener);
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Fri Nov 30 11:55:29 2012
@@ -33,6 +33,7 @@ import org.apache.qpid.configuration.Cli
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.protocol.AMQConstant;
@@ -214,7 +215,8 @@ public class AMQConnectionDelegate_0_10 
                         + "********");
             }
 
-            ConnectionSettings conSettings = retriveConnectionSettings(brokerDetail);
+            ConnectionSettings conSettings = retrieveConnectionSettings(brokerDetail);
+
             _qpidConnection.setConnectionDelegate(new ClientConnectionDelegate(conSettings, _conn.getConnectionURL()));
             _qpidConnection.connect(conSettings);
 
@@ -420,7 +422,13 @@ public class AMQConnectionDelegate_0_10 
         return featureSupported;
     }
 
-    private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail)
+    @Override
+    public void setHeartbeatListener(HeartbeatListener listener)
+    {
+        ((ClientConnectionDelegate)(_qpidConnection.getConnectionDelegate())).setHeartbeatListener(listener);
+    }
+
+    private ConnectionSettings retrieveConnectionSettings(BrokerDetails brokerDetail)
     {
         ConnectionSettings conSettings = brokerDetail.buildConnectionSettings();
 
@@ -442,6 +450,24 @@ public class AMQConnectionDelegate_0_10 
 
         conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail));
 
+        //Check connection-level ssl override setting
+        String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL);
+        if(connectionSslOption != null)
+        {
+            boolean connUseSsl = Boolean.parseBoolean(connectionSslOption);
+            boolean brokerlistUseSsl = conSettings.isUseSSL();
+
+            if( connUseSsl != brokerlistUseSsl)
+            {
+                conSettings.setUseSSL(connUseSsl);
+
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Applied connection ssl option override, setting UseSsl to: " + connUseSsl );
+                }
+            }
+        }
+
         return conSettings;
     }
 
@@ -464,10 +490,14 @@ public class AMQConnectionDelegate_0_10 
             heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000;
             _logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>");
         }
-        else
+        else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null)
         {
             heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
         }
+        else
+        {
+            heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT);
+        }
         return heartbeat;
     }
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Fri Nov 30 11:55:29 2012
@@ -40,6 +40,7 @@ import org.apache.qpid.framing.TxSelectB
 import org.apache.qpid.framing.TxSelectOkBody;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.ssl.SSLContextFactory;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -100,32 +101,30 @@ public class AMQConnectionDelegate_8_0 i
         ConnectionSettings settings = brokerDetail.buildConnectionSettings();
         settings.setProtocol(brokerDetail.getTransport());
 
-        SSLContext sslContext = null;
-        if (settings.isUseSSL())
+        //Check connection-level ssl override setting
+        String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL);
+        if(connectionSslOption != null)
         {
-            try
-            {
-                sslContext = SSLContextFactory.buildClientContext(
-                                settings.getTrustStorePath(),
-                                settings.getTrustStorePassword(),
-                                settings.getTrustStoreType(),
-                                settings.getTrustManagerFactoryAlgorithm(),
-                                settings.getKeyStorePath(),
-                                settings.getKeyStorePassword(),
-                                settings.getKeyStoreType(),
-                                settings.getKeyManagerFactoryAlgorithm(),
-                                settings.getCertAlias());
-            }
-            catch (GeneralSecurityException e)
+            boolean connUseSsl = Boolean.parseBoolean(connectionSslOption);
+            boolean brokerlistUseSsl = settings.isUseSSL();
+
+            if( connUseSsl != brokerlistUseSsl)
             {
-                throw new AMQException("Unable to create SSLContext: " + e.getMessage(), e);
+                settings.setUseSSL(connUseSsl);
+
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Applied connection ssl option override, setting UseSsl to: " + connUseSsl );
+                }
             }
         }
 
         SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings);
 
         OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
-        NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), sslContext);
+
+        NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()),
+                                                      _conn.getProtocolHandler());
         _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender()));
 
         StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
@@ -379,4 +378,10 @@ public class AMQConnectionDelegate_8_0 i
         // we just hardcode JMS selectors as supported.
         return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
     }
+
+    @Override
+    public void setHeartbeatListener(HeartbeatListener listener)
+    {
+        _conn.getProtocolHandler().setHeartbeatListener(listener);
+    }
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Fri Nov 30 11:55:29 2012
@@ -196,7 +196,14 @@ public class JMSObjectMessage extends Ab
         if (data != null && data.hasRemaining())
         {
             ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new ByteBufferInputStream(data));
-            result = (Serializable) in.readObject();
+            try
+            {
+                result = (Serializable) in.readObject();
+            }
+            finally
+            {
+                in.close();
+            }
         }
         return result;
     }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Nov 30 11:55:29 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.client.protocol;
 
+import org.apache.qpid.client.HeartbeatListener;
 import org.apache.qpid.util.BytesDataOutput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -178,6 +179,9 @@ public class AMQProtocolHandler implemen
 
     private NetworkConnection _network;
     private Sender<ByteBuffer> _sender;
+    private long _lastReadTime = System.currentTimeMillis();
+    private long _lastWriteTime = System.currentTimeMillis();
+    private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
 
     /**
      * Creates a new protocol handler, associated with the specified client connection instance.
@@ -300,7 +304,6 @@ public class AMQProtocolHandler implemen
     {
         _logger.debug("Protocol Session [" + this + "] idle: reader");
         //  failover:
-        HeartbeatDiagnostics.timeout();
         _logger.warn("Timed out while waiting for heartbeat from peer.");
         _network.close();
     }
@@ -309,7 +312,7 @@ public class AMQProtocolHandler implemen
     {
         _logger.debug("Protocol Session [" + this + "] idle: reader");
         writeFrame(HeartbeatBody.FRAME);
-        HeartbeatDiagnostics.sent();
+        _heartbeatListener.heartbeatSent();
     }
 
     /**
@@ -442,6 +445,7 @@ public class AMQProtocolHandler implemen
     public void received(ByteBuffer msg)
     {
         _readBytes += msg.remaining();
+        _lastReadTime = System.currentTimeMillis();
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -470,8 +474,6 @@ public class AMQProtocolHandler implemen
 
                         final AMQBody bodyFrame = frame.getBodyFrame();
 
-                        HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
                         bodyFrame.handle(frame.getChannel(), _protocolSession);
 
                         _connection.bytesReceived(_readBytes);
@@ -560,6 +562,7 @@ public class AMQProtocolHandler implemen
     public  synchronized void writeFrame(AMQDataBlock frame, boolean flush)
     {
         final ByteBuffer buf = asByteBuffer(frame);
+        _lastWriteTime = System.currentTimeMillis();
         _writtenBytes += buf.remaining();
         _sender.send(buf);
         if(flush)
@@ -882,6 +885,18 @@ public class AMQProtocolHandler implemen
         _sender = sender;
     }
 
+    @Override
+    public long getLastReadTime()
+    {
+        return _lastReadTime;
+    }
+
+    @Override
+    public long getLastWriteTime()
+    {
+        return _lastWriteTime;
+    }
+
     protected Sender<ByteBuffer> getSender()
     {
         return _sender;
@@ -894,7 +909,6 @@ public class AMQProtocolHandler implemen
         {
             _network.setMaxWriteIdle(delay);
             _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
-            HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
         }
     }
 
@@ -909,5 +923,13 @@ public class AMQProtocolHandler implemen
     }
 
 
+    public void setHeartbeatListener(HeartbeatListener listener)
+    {
+        _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+    }
 
+    public void heartbeatBodyReceived()
+    {
+        _heartbeatListener.heartbeatReceived();
+    }
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Nov 30 11:55:29 2012
@@ -267,7 +267,7 @@ public class AMQProtocolSession implemen
 
     public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
     {
-
+        _protocolHandler.heartbeatBodyReceived();
     }
 
     /**

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java Fri Nov 30 11:55:29 2012
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client.transport;
 
+import org.apache.qpid.client.HeartbeatListener;
+import org.apache.qpid.transport.ConnectionHeartbeat;
 import org.ietf.jgss.GSSContext;
 import org.ietf.jgss.GSSException;
 import org.ietf.jgss.GSSManager;
@@ -70,6 +72,7 @@ public class ClientConnectionDelegate ex
     }
 
     private final ConnectionURL _connectionURL;
+    private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
 
     /**
      * @param settings
@@ -165,4 +168,19 @@ public class ClientConnectionDelegate ex
 
         return null;
     }
+
+    @Override
+    public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+    {
+        // ClientDelegate simply responds to heartbeats with heartbeats
+        _heartbeatListener.heartbeatReceived();
+        super.connectionHeartbeat(conn, hearbeat);
+        _heartbeatListener.heartbeatSent();
+    }
+
+
+    public void setHeartbeatListener(HeartbeatListener listener)
+    {
+        _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+    }
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Fri Nov 30 11:55:29 2012
@@ -44,6 +44,13 @@ public interface ConnectionURL
     public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
 
     /**
+     * This option is used to apply a connection level override of
+     * the {@value BrokerDetails#OPTIONS_SSL} option values in the
+     * {@value ConnectionURL#OPTIONS_BROKERLIST};
+     */
+    public static final String OPTIONS_SSL = "ssl";
+
+    /**
      * This option is only applicable for 0-8/0-9/0-9-1 protocols connection
      * <p>
      * It tells the client to delegate the requeue/DLQ decision to the

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java Fri Nov 30 11:55:29 2012
@@ -143,4 +143,25 @@ public class BrokerDetailsTest extends T
 
         assertEquals("Unexpected toString", expectedToString, actualToString);
     }
+
+    public void testDefaultSsl() throws URLSyntaxException
+    {
+        String brokerURL = "tcp://localhost:5672";
+        AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+
+        assertNull("default value should be null", broker.getProperty(BrokerDetails.OPTIONS_SSL));
+    }
+
+    public void testOverridingSsl() throws URLSyntaxException
+    {
+        String brokerURL = "tcp://localhost:5672?ssl='true'";
+        AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+
+        assertTrue("value should be true", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL)));
+
+        brokerURL = "tcp://localhost:5672?ssl='false''&maxprefetch='1'";
+        broker = new AMQBrokerDetails(brokerURL);
+
+        assertFalse("value should be false", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL)));
+    }
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Fri Nov 30 11:55:29 2012
@@ -30,7 +30,6 @@ import org.apache.qpid.url.URLSyntaxExce
 
 public class ConnectionURLTest extends TestCase
 {
-
     public void testFailoverURL() throws URLSyntaxException
     {
         String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin?cyclecount='100''";
@@ -563,5 +562,34 @@ public class ConnectionURLTest extends T
         assertNull("Reject behaviour option was not as expected",
                 connectionurl.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR));
     }
+
+    /**
+     * Verify that when the ssl option is not specified, asking for the option returns null,
+     * such that this can later be used to verify it wasnt specified.
+     */
+    public void testDefaultSsl() throws URLSyntaxException
+    {
+        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'";
+        ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+        assertNull("default ssl value should be null", connectionURL.getOption(ConnectionURL.OPTIONS_SSL));
+    }
+
+    /**
+     * Verify that when the ssl option is specified, asking for the option returns the value,
+     * such that this can later be used to verify what value it was specified as.
+     */
+    public void testOverridingSsl() throws URLSyntaxException
+    {
+        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&ssl='true'";
+        ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+        assertTrue("value should be true", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_SSL)));
+
+        url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&ssl='false'";
+        connectionURL = new AMQConnectionURL(url);
+
+        assertFalse("value should be false", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_SSL)));
+    }
 }
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common.xml?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common.xml (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common.xml Fri Nov 30 11:55:29 2012
@@ -23,7 +23,10 @@
   <dirname property="project.root" file="${ant.file.common}"/>
 
   <property name="project.name"          value="qpid"/>
-  <property name="project.version"       value="0.19"/>
+  <!-- Version used for standard build output -->
+  <property name="project.version"       value="0.21"/>
+  <!-- The release version used for maven output. SNAPSHOT added via maven.version.suffix -->
+  <property name="project.version.maven" value="0.22"/>
   <property name="project.url"           value="http://qpid.apache.org"/>
   <property name="project.groupid"       value="org.apache.qpid"/>
   <property name="project.namever"       value="${project.name}-${project.version}"/>
@@ -42,7 +45,7 @@
   <property name="build.report"          location="${build}/report"/>
   <property name="build.release"         location="${build}/release"/>
   <property name="build.release.prepare" location="${build.release}/prepare"/>
-  <property name="build.plugins"         location="${build}/lib/plugins"/>
+  <property name="build.lib.broker.plugins"  location="${build}/lib/broker-plugins"/>
   <property name="build.coverage.report" location="${build}/coverage/report"/>
   <property name="build.coverage.src"    location="${build}/coverage/src"/>
   <property name="build.findbugs"        location="${build}/findbugs"/>

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/bin/qpid-run
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/bin/qpid-run?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/bin/qpid-run (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/bin/qpid-run Fri Nov 30 11:55:29 2012
@@ -88,10 +88,10 @@ SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="-DQPI
 if [ -n "$QPID_LOG_PREFIX" ]; then
     if [ "X$QPID_LOG_PREFIX" = "XPID" ]; then
         log $INFO Using pid in qpid log name prefix
-        LOG_PREFIX=" -Dlogprefix=$$"
+        LOG_PREFIX="-Dlogprefix=$$"
     else
         log $INFO Using qpid logprefix property
-        LOG_PREFIX=" -Dlogprefix=$QPID_LOG_PREFIX"
+        LOG_PREFIX="-Dlogprefix=$QPID_LOG_PREFIX"
     fi
     SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="${LOG_PREFIX}"
 fi
@@ -99,10 +99,10 @@ fi
 if [ -n "$QPID_LOG_SUFFIX" ]; then
     if [ "X$QPID_LOG_SUFFIX" = "XPID" ]; then
         log $INFO Using pid in qpid log name suffix
-        LOG_SUFFIX=" -Dlogsuffix=$$"
+        LOG_SUFFIX="-Dlogsuffix=$$"
     else
         log $INFO Using qpig logsuffix property
-        LOG_SUFFIX=" -Dlogsuffix=$QPID_LOG_SUFFIX"
+        LOG_SUFFIX="-Dlogsuffix=$QPID_LOG_SUFFIX"
     fi
     SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="${LOG_SUFFIX}"
 fi

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/common.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/common.bnd?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/common.bnd (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/common.bnd Fri Nov 30 11:55:29 2012
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-ver: 0.19.0
+ver: 0.21.0
 
 Bundle-SymbolicName: qpid-common
 Bundle-Version: ${ver}

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Fri Nov 30 11:55:29 2012
@@ -23,6 +23,7 @@ package org.apache.qpid.protocol;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.TransportActivity;
 
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -31,7 +32,7 @@ import java.nio.ByteBuffer;
  * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
  * decodes it and then process the result.
  */
-public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
+public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity
 {
    // Returns the remote address of the NetworkDriver
    SocketAddress getRemoteAddress();
@@ -56,6 +57,6 @@ public interface ProtocolEngine extends 
    void readerIdle();
 
 
-    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+   public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
 
 }
\ No newline at end of file

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Nov 30 11:55:29 2012
@@ -21,12 +21,7 @@
 package org.apache.qpid.transport;
 
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.*;
 import org.apache.qpid.transport.network.security.SecurityLayer;
 import org.apache.qpid.transport.network.security.SecurityLayerFactory;
 import org.apache.qpid.transport.util.Logger;
@@ -73,6 +68,9 @@ public class Connection extends Connecti
     //Usable channels are numbered 0 to <ChannelMax> - 1
     public static final int MAX_CHANNEL_MAX = 0xFFFF;
     public static final int MIN_USABLE_CHANNEL_NUM = 0;
+    private long _lastSendTime;
+    private long _lastReadTime;
+
 
     public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
 
@@ -231,7 +229,8 @@ public class Connection extends Connecti
                 addConnectionListener((ConnectionListener)secureReceiver);
             }
 
-            NetworkConnection network = transport.connect(settings, secureReceiver, null);
+            NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity());
+
             setRemoteAddress(network.getRemoteAddress());
             setLocalAddress(network.getLocalAddress());
 
@@ -368,6 +367,7 @@ public class Connection extends Connecti
 
     public void received(ProtocolEvent event)
     {
+        _lastReadTime = System.currentTimeMillis();
         if(log.isDebugEnabled())
         {
             log.debug("RECV: [%s] %s", this, event);
@@ -377,6 +377,7 @@ public class Connection extends Connecti
 
     public void send(ProtocolEvent event)
     {
+        _lastSendTime = System.currentTimeMillis();
         if(log.isDebugEnabled())
         {
             log.debug("SEND: [%s] %s", this, event);
@@ -745,4 +746,38 @@ public class Connection extends Connecti
         sessionDetached.setCode(sessionDetachCode);
         invoke(sessionDetached);
     }
+
+
+    protected void doHeartBeat()
+    {
+        connectionHeartbeat();
+    }
+
+    private class ConnectionActivity implements TransportActivity
+    {
+        @Override
+        public long getLastReadTime()
+        {
+            return _lastReadTime;
+        }
+
+        @Override
+        public long getLastWriteTime()
+        {
+            return _lastSendTime;
+        }
+
+        @Override
+        public void writerIdle()
+        {
+            connectionHeartbeat();
+        }
+
+        @Override
+        public void readerIdle()
+        {
+            // TODO
+
+        }
+    }
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java Fri Nov 30 11:55:29 2012
@@ -27,5 +27,7 @@ import javax.net.ssl.SSLContext;
 
 public interface IncomingNetworkTransport extends NetworkTransport
 {
-    public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext);
+    public void accept(NetworkTransportConfiguration config,
+                       ProtocolEngineFactory factory,
+                       SSLContext sslContext);
 }
\ No newline at end of file

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java Fri Nov 30 11:55:29 2012
@@ -50,4 +50,8 @@ public interface NetworkConnection
     void setPeerPrincipal(Principal principal);
 
     Principal getPeerPrincipal();
+
+    int getMaxReadIdle();
+
+    int getMaxWriteIdle();
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java Fri Nov 30 11:55:29 2012
@@ -23,12 +23,13 @@ package org.apache.qpid.transport.networ
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.Receiver;
 
-import javax.net.ssl.SSLContext;
 import java.nio.ByteBuffer;
 
 public interface OutgoingNetworkTransport extends NetworkTransport
 {
     public NetworkConnection getConnection();
 
-    public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext);
+    public NetworkConnection connect(ConnectionSettings settings,
+                                     Receiver<ByteBuffer> delegate,
+                                     TransportActivity transportActivity);
 }
\ No newline at end of file

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java Fri Nov 30 11:55:29 2012
@@ -26,7 +26,9 @@ import java.nio.ByteBuffer;
 import java.security.Principal;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.Ticker;
 import org.apache.qpid.transport.network.NetworkConnection;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,14 +40,23 @@ public class IoNetworkConnection impleme
     private final IoSender _ioSender;
     private final IoReceiver _ioReceiver;
     private Principal _principal;
+    private int _maxReadIdle;
+    private int _maxWriteIdle;
 
     public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
-            int sendBufferSize, int receiveBufferSize, long timeout)
+                               int sendBufferSize, int receiveBufferSize, long timeout)
+    {
+        this(socket,delegate,sendBufferSize,receiveBufferSize,timeout,null);
+    }
+
+    public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
+            int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
     {
         _socket = socket;
         _timeout = timeout;
 
         _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout);
+        _ioReceiver.setTicker(ticker);
 
         _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout);
 
@@ -88,14 +99,12 @@ public class IoNetworkConnection impleme
 
     public void setMaxWriteIdle(int sec)
     {
-        // TODO implement support for setting heartbeating config in this way
-        // Currently a socket timeout is used in IoSender
+        _maxWriteIdle = sec;
     }
 
     public void setMaxReadIdle(int sec)
     {
-        // TODO implement support for setting heartbeating config in this way
-        // Currently a socket timeout is used in IoSender
+        _maxReadIdle = sec;
     }
 
     @Override
@@ -109,4 +118,16 @@ public class IoNetworkConnection impleme
     {
         return _principal;
     }
+
+    @Override
+    public int getMaxReadIdle()
+    {
+        return _maxReadIdle;
+    }
+
+    @Override
+    public int getMaxWriteIdle()
+    {
+        return _maxWriteIdle;
+    }
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Fri Nov 30 11:55:29 2012
@@ -41,9 +41,8 @@ import org.apache.qpid.transport.Connect
 import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.*;
+
 import org.slf4j.LoggerFactory;
 
 public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
@@ -56,7 +55,9 @@ public class IoNetworkTransport implemen
     private IoNetworkConnection _connection;
     private AcceptingThread _acceptor;
 
-    public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext)
+    public NetworkConnection connect(ConnectionSettings settings,
+                                     Receiver<ByteBuffer> delegate,
+                                     TransportActivity transportActivity)
     {
         int sendBufferSize = settings.getWriteBufferSize();
         int receiveBufferSize = settings.getReadBufferSize();
@@ -91,7 +92,9 @@ public class IoNetworkTransport implemen
 
         try
         {
-            _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT);
+            IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
+            _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+            ticker.setConnection(_connection);
             _connection.start();
         }
         catch(Exception e)
@@ -128,7 +131,9 @@ public class IoNetworkTransport implemen
         return _connection;
     }
 
-    public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext)
+    public void accept(NetworkTransportConfiguration config,
+                       ProtocolEngineFactory factory,
+                       SSLContext sslContext)
     {
         try
         {
@@ -149,6 +154,7 @@ public class IoNetworkTransport implemen
         private ProtocolEngineFactory _factory;
         private SSLContext _sslContext;
         private ServerSocket _serverSocket;
+        private int _timeout;
 
         private AcceptingThread(NetworkTransportConfiguration config,
                                 ProtocolEngineFactory factory,
@@ -157,6 +163,7 @@ public class IoNetworkTransport implemen
             _config = config;
             _factory = factory;
             _sslContext = sslContext;
+            _timeout = TIMEOUT;
 
             InetSocketAddress address = config.getAddress();
 
@@ -217,6 +224,7 @@ public class IoNetworkTransport implemen
                     {
                         socket = _serverSocket.accept();
                         socket.setTcpNoDelay(_config.getTcpNoDelay());
+                        socket.setSoTimeout(_timeout);
 
                         final Integer sendBufferSize = _config.getSendBufferSize();
                         final Integer receiveBufferSize = _config.getReceiveBufferSize();
@@ -226,7 +234,10 @@ public class IoNetworkTransport implemen
 
                         ProtocolEngine engine = _factory.newProtocolEngine();
 
-                        NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, TIMEOUT);
+                        final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+                        NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
+                                                                               ticker);
+                        ticker.setConnection(connection);
 
                         if(_sslContext != null)
                         {
@@ -293,6 +304,7 @@ public class IoNetworkTransport implemen
                 }
             }
         }
+
     }
 
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Fri Nov 30 11:55:29 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.Ticker;
 import org.apache.qpid.transport.util.Logger;
 
 import javax.net.ssl.SSLSocket;
@@ -31,6 +32,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.Socket;
 import java.net.SocketException;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -51,6 +53,8 @@ final class IoReceiver implements Runnab
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final Thread receiverThread;
     private static final boolean shutdownBroken;
+
+    private Ticker _ticker;
     static
     {
         String osName = System.getProperty("os.name");
@@ -136,7 +140,7 @@ final class IoReceiver implements Runnab
     {
         final int threshold = bufferSize / 2;
 
-        // I set the read buffer size simillar to SO_RCVBUF
+        // I set the read buffer size similar to SO_RCVBUF
         // Haven't tested with a lower value to see if it's better or worse
         byte[] buffer = new byte[bufferSize];
         try
@@ -144,17 +148,64 @@ final class IoReceiver implements Runnab
             InputStream in = socket.getInputStream();
             int read = 0;
             int offset = 0;
-            while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+            long currentTime;
+            while(read != -1)
             {
-                if (read > 0)
+                try
                 {
-                    ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
-                    receiver.received(b);
-                    offset+=read;
-                    if (offset > threshold)
+                    while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
                     {
-                        offset = 0;
-                        buffer = new byte[bufferSize];
+                        if (read > 0)
+                        {
+                            ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
+                            receiver.received(b);
+                            offset+=read;
+                            if (offset > threshold)
+                            {
+                                offset = 0;
+                                buffer = new byte[bufferSize];
+                            }
+                        }
+                        currentTime =  System.currentTimeMillis();
+
+                        if(_ticker != null)
+                        {
+                            int tick = _ticker.getTimeToNextTick(currentTime);
+                            if(tick <= 0)
+                            {
+                                tick = _ticker.tick(currentTime);
+                            }
+                            try
+                            {
+                                if(!socket.isClosed())
+                                {
+                                    socket.setSoTimeout(tick <= 0 ? 1 : tick);
+                                }
+                            }
+                            catch(SocketException e)
+                            {
+                                // ignore - closed socket
+                            }
+                        }
+                    }
+                }
+                catch (SocketTimeoutException e)
+                {
+                    currentTime = System.currentTimeMillis();
+                    if(_ticker != null)
+                    {
+                        final int tick = _ticker.tick(currentTime);
+                        if(!socket.isClosed())
+                        {
+                            try
+                            {
+                                socket.setSoTimeout(tick <= 0 ? 1 : tick );
+                            }
+                            catch(SocketException ex)
+                            {
+                                // ignore - closed socket
+                            }
+                        }
                     }
                 }
             }
@@ -195,4 +246,15 @@ final class IoReceiver implements Runnab
         return !brokenClose && !sslSocketClosed;
     }
 
+    public Ticker getTicker()
+    {
+        return _ticker;
+    }
+
+    public void setTicker(Ticker ticker)
+    {
+        _ticker = ticker;
+    }
+
+
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java Fri Nov 30 11:55:29 2012
@@ -83,6 +83,18 @@ public class TestNetworkConnection imple
         return null;
     }
 
+    @Override
+    public int getMaxReadIdle()
+    {
+        return 0;
+    }
+
+    @Override
+    public int getMaxWriteIdle()
+    {
+        return 0;
+    }
+
     public void setMaxWriteIdle(int idleTime)
     {
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java Fri Nov 30 11:55:29 2012
@@ -128,7 +128,8 @@ public class TransportTest extends QpidT
         }
 
         public NetworkConnection connect(ConnectionSettings settings,
-                Receiver<ByteBuffer> delegate, SSLContext sslContext)
+                                         Receiver<ByteBuffer> delegate,
+                                         TransportActivity transportActivity)
         {
             throw new UnsupportedOperationException();
         }
@@ -148,7 +149,7 @@ public class TransportTest extends QpidT
         }
 
         public void accept(NetworkTransportConfiguration config,
-                ProtocolEngineFactory factory, SSLContext sslContext)
+                           ProtocolEngineFactory factory, SSLContext sslContext)
         {
             throw new UnsupportedOperationException();
         }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/management-common.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/management-common.bnd?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/management-common.bnd (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/management-common.bnd Fri Nov 30 11:55:29 2012
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-ver: 0.19.0
+ver: 0.21.0
 
 Bundle-SymbolicName: qpid-management-common
 Bundle-Version: ${ver}

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:r1411034-1415148



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org