You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2009/01/23 19:07:50 UTC

svn commit: r737125 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/configuration/ client/src/main/java/org/apache/qpid/jms/ common/src/main/java/org/apache/qpid/ common/src/main/java...

Author: rajith
Date: Fri Jan 23 10:07:49 2009
New Revision: 737125

URL: http://svn.apache.org/viewvc?rev=737125&view=rev
Log:
This is related to QPID-1609.
Currently we only check idle state on the incomming side.
In the future we plan to add code to send a heartbeat when we reach the idle state on the outgoing side.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Jan 23 10:07:49 2009
@@ -20,25 +20,21 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.qpid.AMQConnectionFailureException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQProtocolException;
-import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.configuration.ClientProperties;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.Connection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.URLSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
+import java.net.UnknownHostException;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
@@ -57,17 +53,33 @@
 import javax.naming.Reference;
 import javax.naming.Referenceable;
 import javax.naming.StringRefAddr;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.ConnectException;
-import java.net.UnknownHostException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQProtocolException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.client.configuration.ClientProperties;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
 {
@@ -356,7 +368,7 @@
             // use the defaul value set for all connections
             _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
         }
-
+        
         _failoverPolicy = new FailoverPolicy(connectionURL);
         BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails();
         if (brokerDetails.getTransport().equals(BrokerDetails.VM))
@@ -493,7 +505,7 @@
 
             throw new AMQConnectionFailureException(message, connectionException);
         }
-
+        
         _connectionMetaData = new QpidConnectionMetaData(this);
     }
 
@@ -1456,4 +1468,9 @@
     {
         return _syncPersistence;
     }
+    
+    public void setIdleTimeout(long l)
+    {
+        _delegate.setIdleTimeout(l);
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Fri Jan 23 10:07:49 2009
@@ -48,5 +48,6 @@
     void closeConnection(long timeout) throws JMSException, AMQException;
 
     <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
-
+    
+    void setIdleTimeout(long l);
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Fri Jan 23 10:07:49 2009
@@ -22,7 +22,6 @@
 
 
 import java.io.IOException;
-
 import java.util.ArrayList;
 import java.util.List;
 
@@ -31,21 +30,19 @@
 import javax.jms.XASession;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQProtocolException;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.configuration.ClientProperties;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.Session;
-import org.apache.qpid.ErrorCode;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.ConnectionClose;
 import org.apache.qpid.transport.ConnectionException;
 import org.apache.qpid.transport.ConnectionListener;
 import org.apache.qpid.transport.ProtocolVersionException;
 import org.apache.qpid.transport.TransportException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -146,6 +143,17 @@
                               " username: " + _conn.getUsername() +
                               " password: " + _conn.getPassword());
             }
+            
+            if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
+            {
+                this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT)));
+            }
+            else
+            {
+                // use the default value set for all connections
+                this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,0));
+            }
+            
             _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
                                     _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL());
             _conn._connected = true;
@@ -273,4 +281,8 @@
         }
     }
 
+    public void setIdleTimeout(long l)
+    {
+        _qpidConnection.setIdleTimeout(l);
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Fri Jan 23 10:07:49 2009
@@ -48,7 +48,6 @@
 import org.apache.qpid.framing.TxSelectOkBody;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ChannelLimitReachedException;
-import org.apache.qpid.transport.network.io.IoTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -288,5 +287,6 @@
             }
         }
     }
-
+    
+    public void setIdleTimeout(long l){}
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java Fri Jan 23 10:07:49 2009
@@ -46,6 +46,18 @@
      * type: boolean
      */
     public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence";
+    
+    
+    /**
+     * This value will be used in the following settings
+     * To calculate the SO_TIMEOUT option of the socket (2*idle_timeout)
+     * If this values is between the max and min values specified for heartbeat
+     * by the broker in TuneOK it will be used as the heartbeat interval.
+     * If not a warning will be printed and the max value specified for 
+     * heartbeat in TuneOK will be used
+     */
+    public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
+    
 
      /**
      * ==========================================================

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Fri Jan 23 10:07:49 2009
@@ -34,6 +34,7 @@
     public static final String OPTIONS_RETRY = "retries";
     public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
     public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
+    public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout";
     public static final int DEFAULT_PORT = 5672;
 
     public static final String SOCKET = "socket";
@@ -55,7 +56,7 @@
     public static final String VIRTUAL_HOST = "virtualhost";
     public static final String CLIENT_ID = "client_id";
     public static final String USERNAME = "username";
-    public static final String PASSWORD = "password";
+    public static final String PASSWORD = "password";    
 
     String getHost();
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java Fri Jan 23 10:07:49 2009
@@ -20,12 +20,12 @@
  */
 package org.apache.qpid;
 
+import static org.apache.qpid.transport.util.Functions.str;
+
 import java.nio.ByteBuffer;
 
 import org.apache.qpid.transport.Sender;
 
-import static org.apache.qpid.transport.util.Functions.*;
-
 
 /**
  * ConsoleOutput
@@ -51,4 +51,13 @@
         System.out.println("CLOSED");
     }
 
+    @Override
+    public void setIdleTimeout(long l)
+    {
+        // TODO Auto-generated method stub
+        
+    }
+    
+    
+
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Fri Jan 23 10:07:49 2009
@@ -20,27 +20,17 @@
  */
 package org.apache.qpid.transport;
 
-import java.util.ArrayList;
+import static org.apache.qpid.transport.Connection.State.OPEN;
+
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-
-import java.io.UnsupportedEncodingException;
-
-import org.apache.qpid.QpidException;
-
-import org.apache.qpid.security.UsernamePasswordCallbackHandler;
 
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
 
-
-import static org.apache.qpid.transport.Connection.State.*;
+import org.apache.qpid.security.UsernamePasswordCallbackHandler;
+import org.apache.qpid.transport.util.Logger;
 
 
 /**
@@ -50,6 +40,7 @@
 
 public class ClientDelegate extends ConnectionDelegate
 {
+    private static final Logger log = Logger.get(ClientDelegate.class);
 
     private String vhost;
     private String username;
@@ -121,7 +112,14 @@
     @Override public void connectionTune(Connection conn, ConnectionTune tune)
     {
         conn.setChannelMax(tune.getChannelMax());
-        conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax());
+        int hb_interval = calculateHeartbeatInterval(conn,
+                                                     tune.getHeartbeatMin(),
+                                                     tune.getHeartbeatMax()
+                                                     );
+        conn.connectionTuneOk(tune.getChannelMax(), 
+                              tune.getMaxFrameSize(), 
+                              hb_interval);
+        conn.setIdleTimeout(hb_interval*1000);
         conn.connectionOpen(vhost, null, Option.INSIST);
     }
 
@@ -134,5 +132,22 @@
     {
         throw new UnsupportedOperationException();
     }
-
+    
+    /**
+     * Currently the spec specified the min and max for heartbeat using secs
+     */
+    private int calculateHeartbeatInterval(Connection conn,int min, int max)
+    {
+        long l = conn.getIdleTimeout()/1000;
+        if (l !=0 && l >= min && l <= max)
+        {
+            return (int)l;
+        }
+        else
+        {
+            log.warn("Ignoring the idle timeout %s set by the connection," +
+            		" using the brokers max value %s", l,max);
+            return max;
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Jan 23 10:07:49 2009
@@ -83,7 +83,8 @@
     private String locale;
     private SaslServer saslServer;
     private SaslClient saslClient;
-
+    private long idleTimeout = 0;
+    
     // want to make this final
     private int _connectionId;
 
@@ -114,6 +115,7 @@
     public void setSender(Sender<ProtocolEvent> sender)
     {
         this.sender = sender;
+        sender.setIdleTimeout(idleTimeout);         
     }
 
     void setState(State state)
@@ -497,6 +499,20 @@
         }
     }
 
+    public void setIdleTimeout(long l)
+    {
+        idleTimeout = l;       
+        if (sender != null)
+        {            
+            sender.setIdleTimeout(l);    
+        }
+    }
+    
+    public long getIdleTimeout()
+    {
+        return idleTimeout;
+    }
+    
     public String toString()
     {
         return String.format("conn:%x", System.identityHashCode(this));

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java Fri Jan 23 10:07:49 2009
@@ -28,6 +28,7 @@
 
 public interface Sender<T>
 {
+    void setIdleTimeout(long l);
 
     void send(T msg);
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Fri Jan 23 10:07:49 2009
@@ -20,7 +20,15 @@
  */
 package org.apache.qpid.transport.network;
 
-import org.apache.qpid.transport.codec.BBEncoder;
+import static java.lang.Math.min;
+import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
+import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
+import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
+import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
+import static org.apache.qpid.transport.network.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.Method;
@@ -31,13 +39,7 @@
 import org.apache.qpid.transport.SegmentType;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.Struct;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static org.apache.qpid.transport.network.Frame.*;
-
-import static java.lang.Math.*;
+import org.apache.qpid.transport.codec.BBEncoder;
 
 
 /**
@@ -235,5 +237,9 @@
     {
         throw new IllegalArgumentException("" + error);
     }
-
+    
+    public void setIdleTimeout(long l)
+    {
+        sender.setIdleTimeout(l);
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Fri Jan 23 10:07:49 2009
@@ -143,9 +143,6 @@
                   t.getMessage().equalsIgnoreCase("socket closed") &&
                   closed.get()))
             {
-                log.error(t, "===========================================================");
-                log.error(t, "Exception");
-                log.error(t, "===========================================================");
                 receiver.exception(t);
             }
         }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Fri Jan 23 10:07:49 2009
@@ -18,6 +18,8 @@
  */
 package org.apache.qpid.transport.network.io;
 
+import static org.apache.qpid.transport.util.Functions.mod;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
@@ -30,8 +32,6 @@
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.util.Logger;
 
-import static org.apache.qpid.transport.util.Functions.*;
-
 
 public final class IoSender implements Runnable, Sender<ByteBuffer>
 {
@@ -56,6 +56,7 @@
     private final Object notEmpty = new Object();
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final Thread senderThread;
+    private long idleTimeout;
     
     private volatile Throwable exception = null;
 
@@ -223,8 +224,7 @@
 
     public void run()
     {
-        final int size = buffer.length;
-
+        final int size = buffer.length;       
         while (true)
         {
             final int hd = head;
@@ -294,4 +294,16 @@
         }
     }
 
+    public void setIdleTimeout(long l)
+    {
+        try
+        {
+            socket.setSoTimeout((int)l*2);
+            idleTimeout = l;
+        }
+        catch (Exception e)
+        {
+            throw new SenderException(e);
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java Fri Jan 23 10:07:49 2009
@@ -24,7 +24,6 @@
 import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteFuture;
-
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
 
@@ -77,5 +76,15 @@
         CloseFuture closed = session.close();
         closed.join();
     }
-
+    
+    public void setIdleTimeout(long l)
+    {
+      //noop
+    }
+    
+    public long getIdleTimeout()
+    {
+        return 0;
+    }
+    
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java Fri Jan 23 10:07:49 2009
@@ -118,4 +118,9 @@
             }
         }
     }
+    
+    public void setIdleTimeout(long l)
+    {
+      //noop
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java Fri Jan 23 10:07:49 2009
@@ -232,4 +232,9 @@
     {
         return engineState;
     }
+    
+    public void setIdleTimeout(long l)
+    {
+        delegate.setIdleTimeout(l);
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org