You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/01/16 00:56:39 UTC

svn commit: r1558617 - in /qpid/branches/0.26/qpid: ./ java/ java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ java/client/src/main/java/org/apache/qpid/client/transport/ java/common/src/main/java/org/apache/qp...

Author: rgodfrey
Date: Wed Jan 15 23:56:37 2014
New Revision: 1558617

URL: http://svn.apache.org/r1558617
Log:
Merged r1558363 from trunk

Modified:
    qpid/branches/0.26/qpid/   (props changed)
    qpid/branches/0.26/qpid/java/   (props changed)
    qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/branches/0.26/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
    qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
    qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
    qpid/branches/0.26/qpid/java/test-profiles/   (props changed)
    qpid/branches/0.26/qpid/java/test-profiles/Java010Excludes

Propchange: qpid/branches/0.26/qpid/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid:r1558363

Propchange: qpid/branches/0.26/qpid/java/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java:r1558363

Modified: qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1558617&r1=1558616&r2=1558617&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Wed Jan 15 23:56:37 2014
@@ -24,7 +24,6 @@ import org.apache.qpid.protocol.ServerPr
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.v0_10.ServerConnection;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
@@ -164,7 +163,8 @@ public class ProtocolEngine_0_10  extend
 
     public void readerIdle()
     {
-        //Todo
+        _connection.getLogActor().message(ConnectionMessages.IDLE_CLOSE());
+        _network.close();
     }
 
     public String getAddress()

Modified: qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1558617&r1=1558616&r2=1558617&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Wed Jan 15 23:56:37 2014
@@ -549,16 +549,6 @@ public class ServerConnection extends Co
         super.setLocalAddress(localAddress);
     }
 
-    public void setNetworkConnection(NetworkConnection network)
-    {
-        _networkConnection = network;
-    }
-
-    public NetworkConnection getNetworkConnection()
-    {
-        return _networkConnection;
-    }
-
     public void doHeartbeat()
     {
         super.doHeartBeat();

Modified: qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1558617&r1=1558616&r2=1558617&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/branches/0.26/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Wed Jan 15 23:56:37 2014
@@ -236,7 +236,6 @@ public class ServerConnectionDelegate ex
         }
 
         final NetworkConnection networkConnection = sconn.getNetworkConnection();
-
         if(ok.hasHeartbeat())
         {
             int heartbeat = ok.getHeartbeat();
@@ -352,4 +351,11 @@ public class ServerConnectionDelegate ex
     {
         return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.PRODUCT);
     }
+
+    @Override
+    protected int getHeartbeatMax()
+    {
+        int delay = (Integer)_broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY);
+        return delay == 0 ? super.getHeartbeatMax() : delay;
+    }
 }

Modified: qpid/branches/0.26/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java?rev=1558617&r1=1558616&r2=1558617&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java (original)
+++ qpid/branches/0.26/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java Wed Jan 15 23:56:37 2014
@@ -179,12 +179,9 @@ public class ClientConnectionDelegate ex
     }
 
     @Override
-    public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+    public void connectionHeartbeat(Connection conn, ConnectionHeartbeat heartbeat)
     {
-        // ClientDelegate simply responds to heartbeats with heartbeats
         _heartbeatListener.heartbeatReceived();
-        super.connectionHeartbeat(conn, hearbeat);
-        _heartbeatListener.heartbeatSent();
     }
 
 
@@ -192,4 +189,11 @@ public class ClientConnectionDelegate ex
     {
         _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
     }
+
+    @Override
+    public void writerIdle(final Connection connection)
+    {
+        super.writerIdle(connection);
+        _heartbeatListener.heartbeatSent();
+    }
 }

Modified: qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1558617&r1=1558616&r2=1558617&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Wed Jan 15 23:56:37 2014
@@ -143,6 +143,8 @@ public class ClientDelegate extends Conn
                               actualHeartbeatInterval);
 
         int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
+        conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
+        conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
         conn.setIdleTimeout(idleTimeout);
 
         int channelMax = tune.getChannelMax();

Modified: qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1558617&r1=1558616&r2=1558617&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Wed Jan 15 23:56:37 2014
@@ -70,6 +70,7 @@ public class Connection extends Connecti
     public static final int MIN_USABLE_CHANNEL_NUM = 0;
     private long _lastSendTime;
     private long _lastReadTime;
+    private NetworkConnection _networkConnection;
 
 
     public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -229,12 +230,13 @@ public class Connection extends Connecti
                 addConnectionListener((ConnectionListener)secureReceiver);
             }
 
-            NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity());
+            _networkConnection = transport.connect(settings, secureReceiver, new ConnectionActivity());
 
-            setRemoteAddress(network.getRemoteAddress());
-            setLocalAddress(network.getLocalAddress());
 
-            final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender());
+            setRemoteAddress(_networkConnection.getRemoteAddress());
+            setLocalAddress(_networkConnection.getLocalAddress());
+
+            final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender());
             if(secureSender instanceof ConnectionListener)
             {
                 addConnectionListener((ConnectionListener)secureSender);
@@ -785,14 +787,26 @@ public class Connection extends Connecti
         @Override
         public void writerIdle()
         {
+            getConnectionDelegate().writerIdle(Connection.this);
             connectionHeartbeat();
         }
 
         @Override
         public void readerIdle()
         {
-            // TODO
-
+            log.error("Closing connection as no heartbeat or other activity detected within specified interval");
+            _networkConnection.close();
         }
     }
+
+
+    public void setNetworkConnection(NetworkConnection network)
+    {
+        _networkConnection = network;
+    }
+
+    public NetworkConnection getNetworkConnection()
+    {
+        return _networkConnection;
+    }
 }

Modified: qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=1558617&r1=1558616&r2=1558617&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java (original)
+++ qpid/branches/0.26/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java Wed Jan 15 23:56:37 2014
@@ -105,4 +105,9 @@ public abstract class ConnectionDelegate
             ssn.closed();
         }
     }
+
+    public void writerIdle(final Connection connection)
+    {
+        connection.doHeartBeat();
+    }
 }

Propchange: qpid/branches/0.26/qpid/java/test-profiles/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/test-profiles:r1558363

Modified: qpid/branches/0.26/qpid/java/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/java/test-profiles/Java010Excludes?rev=1558617&r1=1558616&r2=1558617&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/java/test-profiles/Java010Excludes (original)
+++ qpid/branches/0.26/qpid/java/test-profiles/Java010Excludes Wed Jan 15 23:56:37 2014
@@ -67,9 +67,5 @@ org.apache.qpid.client.failover.AddressB
 // QPID-3604: Immediate Prefetch no longer supported by 0-10
 org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener
 
-// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based
-org.apache.qpid.client.HeartbeatTest#testUnidirectionalHeartbeating
-org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide
-
 // Java 0-10 client does not support re-binding the queue to the same exchange
 org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org