You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2009/01/23 19:07:50 UTC
svn commit: r737125 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/configuration/
client/src/main/java/org/apache/qpid/jms/
common/src/main/java/org/apache/qpid/ common/src/main/java...
Author: rajith
Date: Fri Jan 23 10:07:49 2009
New Revision: 737125
URL: http://svn.apache.org/viewvc?rev=737125&view=rev
Log:
This is related to QPID-1609.
Currently we only check idle state on the incomming side.
In the future we plan to add code to send a heartbeat when we reach the idle state on the outgoing side.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Jan 23 10:07:49 2009
@@ -20,25 +20,21 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.AMQConnectionFailureException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQProtocolException;
-import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.configuration.ClientProperties;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.Connection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.URLSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
+import java.net.UnknownHostException;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@@ -57,17 +53,33 @@
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.ConnectException;
-import java.net.UnknownHostException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQProtocolException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.client.configuration.ClientProperties;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
@@ -356,7 +368,7 @@
// use the defaul value set for all connections
_syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
}
-
+
_failoverPolicy = new FailoverPolicy(connectionURL);
BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails();
if (brokerDetails.getTransport().equals(BrokerDetails.VM))
@@ -493,7 +505,7 @@
throw new AMQConnectionFailureException(message, connectionException);
}
-
+
_connectionMetaData = new QpidConnectionMetaData(this);
}
@@ -1456,4 +1468,9 @@
{
return _syncPersistence;
}
+
+ public void setIdleTimeout(long l)
+ {
+ _delegate.setIdleTimeout(l);
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Fri Jan 23 10:07:49 2009
@@ -48,5 +48,6 @@
void closeConnection(long timeout) throws JMSException, AMQException;
<T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
-
+
+ void setIdleTimeout(long l);
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Fri Jan 23 10:07:49 2009
@@ -22,7 +22,6 @@
import java.io.IOException;
-
import java.util.ArrayList;
import java.util.List;
@@ -31,21 +30,19 @@
import javax.jms.XASession;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQProtocolException;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.ErrorCode;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionClose;
import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ProtocolVersionException;
import org.apache.qpid.transport.TransportException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -146,6 +143,17 @@
" username: " + _conn.getUsername() +
" password: " + _conn.getPassword());
}
+
+ if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
+ {
+ this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT)));
+ }
+ else
+ {
+ // use the default value set for all connections
+ this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,0));
+ }
+
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
_conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL());
_conn._connected = true;
@@ -273,4 +281,8 @@
}
}
+ public void setIdleTimeout(long l)
+ {
+ _qpidConnection.setIdleTimeout(l);
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Fri Jan 23 10:07:49 2009
@@ -48,7 +48,6 @@
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
-import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -288,5 +287,6 @@
}
}
}
-
+
+ public void setIdleTimeout(long l){}
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java Fri Jan 23 10:07:49 2009
@@ -46,6 +46,18 @@
* type: boolean
*/
public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence";
+
+
+ /**
+ * This value will be used in the following settings
+ * To calculate the SO_TIMEOUT option of the socket (2*idle_timeout)
+ * If this values is between the max and min values specified for heartbeat
+ * by the broker in TuneOK it will be used as the heartbeat interval.
+ * If not a warning will be printed and the max value specified for
+ * heartbeat in TuneOK will be used
+ */
+ public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
+
/**
* ==========================================================
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Fri Jan 23 10:07:49 2009
@@ -34,6 +34,7 @@
public static final String OPTIONS_RETRY = "retries";
public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
+ public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout";
public static final int DEFAULT_PORT = 5672;
public static final String SOCKET = "socket";
@@ -55,7 +56,7 @@
public static final String VIRTUAL_HOST = "virtualhost";
public static final String CLIENT_ID = "client_id";
public static final String USERNAME = "username";
- public static final String PASSWORD = "password";
+ public static final String PASSWORD = "password";
String getHost();
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java Fri Jan 23 10:07:49 2009
@@ -20,12 +20,12 @@
*/
package org.apache.qpid;
+import static org.apache.qpid.transport.util.Functions.str;
+
import java.nio.ByteBuffer;
import org.apache.qpid.transport.Sender;
-import static org.apache.qpid.transport.util.Functions.*;
-
/**
* ConsoleOutput
@@ -51,4 +51,13 @@
System.out.println("CLOSED");
}
+ @Override
+ public void setIdleTimeout(long l)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+
+
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Fri Jan 23 10:07:49 2009
@@ -20,27 +20,17 @@
*/
package org.apache.qpid.transport;
-import java.util.ArrayList;
+import static org.apache.qpid.transport.Connection.State.OPEN;
+
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-
-import java.io.UnsupportedEncodingException;
-
-import org.apache.qpid.QpidException;
-
-import org.apache.qpid.security.UsernamePasswordCallbackHandler;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import static org.apache.qpid.transport.Connection.State.*;
+import org.apache.qpid.security.UsernamePasswordCallbackHandler;
+import org.apache.qpid.transport.util.Logger;
/**
@@ -50,6 +40,7 @@
public class ClientDelegate extends ConnectionDelegate
{
+ private static final Logger log = Logger.get(ClientDelegate.class);
private String vhost;
private String username;
@@ -121,7 +112,14 @@
@Override public void connectionTune(Connection conn, ConnectionTune tune)
{
conn.setChannelMax(tune.getChannelMax());
- conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax());
+ int hb_interval = calculateHeartbeatInterval(conn,
+ tune.getHeartbeatMin(),
+ tune.getHeartbeatMax()
+ );
+ conn.connectionTuneOk(tune.getChannelMax(),
+ tune.getMaxFrameSize(),
+ hb_interval);
+ conn.setIdleTimeout(hb_interval*1000);
conn.connectionOpen(vhost, null, Option.INSIST);
}
@@ -134,5 +132,22 @@
{
throw new UnsupportedOperationException();
}
-
+
+ /**
+ * Currently the spec specified the min and max for heartbeat using secs
+ */
+ private int calculateHeartbeatInterval(Connection conn,int min, int max)
+ {
+ long l = conn.getIdleTimeout()/1000;
+ if (l !=0 && l >= min && l <= max)
+ {
+ return (int)l;
+ }
+ else
+ {
+ log.warn("Ignoring the idle timeout %s set by the connection," +
+ " using the brokers max value %s", l,max);
+ return max;
+ }
+ }
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Jan 23 10:07:49 2009
@@ -83,7 +83,8 @@
private String locale;
private SaslServer saslServer;
private SaslClient saslClient;
-
+ private long idleTimeout = 0;
+
// want to make this final
private int _connectionId;
@@ -114,6 +115,7 @@
public void setSender(Sender<ProtocolEvent> sender)
{
this.sender = sender;
+ sender.setIdleTimeout(idleTimeout);
}
void setState(State state)
@@ -497,6 +499,20 @@
}
}
+ public void setIdleTimeout(long l)
+ {
+ idleTimeout = l;
+ if (sender != null)
+ {
+ sender.setIdleTimeout(l);
+ }
+ }
+
+ public long getIdleTimeout()
+ {
+ return idleTimeout;
+ }
+
public String toString()
{
return String.format("conn:%x", System.identityHashCode(this));
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java Fri Jan 23 10:07:49 2009
@@ -28,6 +28,7 @@
public interface Sender<T>
{
+ void setIdleTimeout(long l);
void send(T msg);
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Fri Jan 23 10:07:49 2009
@@ -20,7 +20,15 @@
*/
package org.apache.qpid.transport.network;
-import org.apache.qpid.transport.codec.BBEncoder;
+import static java.lang.Math.min;
+import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
+import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
+import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
+import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
+import static org.apache.qpid.transport.network.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
@@ -31,13 +39,7 @@
import org.apache.qpid.transport.SegmentType;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static org.apache.qpid.transport.network.Frame.*;
-
-import static java.lang.Math.*;
+import org.apache.qpid.transport.codec.BBEncoder;
/**
@@ -235,5 +237,9 @@
{
throw new IllegalArgumentException("" + error);
}
-
+
+ public void setIdleTimeout(long l)
+ {
+ sender.setIdleTimeout(l);
+ }
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Fri Jan 23 10:07:49 2009
@@ -143,9 +143,6 @@
t.getMessage().equalsIgnoreCase("socket closed") &&
closed.get()))
{
- log.error(t, "===========================================================");
- log.error(t, "Exception");
- log.error(t, "===========================================================");
receiver.exception(t);
}
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Fri Jan 23 10:07:49 2009
@@ -18,6 +18,8 @@
*/
package org.apache.qpid.transport.network.io;
+import static org.apache.qpid.transport.util.Functions.mod;
+
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
@@ -30,8 +32,6 @@
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
-import static org.apache.qpid.transport.util.Functions.*;
-
public final class IoSender implements Runnable, Sender<ByteBuffer>
{
@@ -56,6 +56,7 @@
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread senderThread;
+ private long idleTimeout;
private volatile Throwable exception = null;
@@ -223,8 +224,7 @@
public void run()
{
- final int size = buffer.length;
-
+ final int size = buffer.length;
while (true)
{
final int hd = head;
@@ -294,4 +294,16 @@
}
}
+ public void setIdleTimeout(long l)
+ {
+ try
+ {
+ socket.setSoTimeout((int)l*2);
+ idleTimeout = l;
+ }
+ catch (Exception e)
+ {
+ throw new SenderException(e);
+ }
+ }
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java Fri Jan 23 10:07:49 2009
@@ -24,7 +24,6 @@
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteFuture;
-
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -77,5 +76,15 @@
CloseFuture closed = session.close();
closed.join();
}
-
+
+ public void setIdleTimeout(long l)
+ {
+ //noop
+ }
+
+ public long getIdleTimeout()
+ {
+ return 0;
+ }
+
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java Fri Jan 23 10:07:49 2009
@@ -118,4 +118,9 @@
}
}
}
+
+ public void setIdleTimeout(long l)
+ {
+ //noop
+ }
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java?rev=737125&r1=737124&r2=737125&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java Fri Jan 23 10:07:49 2009
@@ -232,4 +232,9 @@
{
return engineState;
}
+
+ public void setIdleTimeout(long l)
+ {
+ delegate.setIdleTimeout(l);
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org