You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2012/11/30 12:55:35 UTC
svn commit: r1415591 [2/3] - in
/qpid/branches/java-broker-config-qpid-4390/qpid/java: ./
amqp-1-0-client-jms/
amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/
amqp-1-0-client/ amqp-1-0-common/ bdbstore/bin/ bdbstore/jmx/ bdbstore/j...
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Fri Nov 30 11:55:29 2012
@@ -43,6 +43,8 @@ public class ProtocolEngine_0_10 extend
private ServerConnection _connection;
private long _createTime = System.currentTimeMillis();
+ private long _lastReadTime;
+ private long _lastWriteTime;
public ProtocolEngine_0_10(ServerConnection conn,
NetworkConnection network)
@@ -68,13 +70,61 @@ public class ProtocolEngine_0_10 extend
{
_network = network;
- _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
+ _connection.setNetworkConnection(network);
+ _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
_connection.setPeerPrincipal(_network.getPeerPrincipal());
// FIXME Two log messages to maintain compatibility with earlier protocol versions
_connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false));
_connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false));
}
+ private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
+ {
+ return new Sender<ByteBuffer>()
+ {
+ @Override
+ public void setIdleTimeout(int i)
+ {
+ sender.setIdleTimeout(i);
+
+ }
+
+ @Override
+ public void send(ByteBuffer msg)
+ {
+ _lastWriteTime = System.currentTimeMillis();
+ sender.send(msg);
+
+ }
+
+ @Override
+ public void flush()
+ {
+ sender.flush();
+
+ }
+
+ @Override
+ public void close()
+ {
+ sender.close();
+
+ }
+ };
+ }
+
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -87,6 +137,7 @@ public class ProtocolEngine_0_10 extend
public void received(final ByteBuffer buf)
{
+ _lastReadTime = System.currentTimeMillis();
super.received(buf);
_connection.receivedComplete();
}
@@ -103,7 +154,7 @@ public class ProtocolEngine_0_10 extend
public void writerIdle()
{
- //Todo
+ _connection.doHeartbeat();
}
public void readerIdle()
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java Fri Nov 30 11:55:29 2012
@@ -53,6 +53,8 @@ public class ProtocolEngine_1_0_0 implem
//private NetworkConnection _networkDriver;
private long _readBytes;
private long _writtenBytes;
+ private long _lastReadTime;
+ private long _lastWriteTime;
private final IApplicationRegistry _appRegistry;
private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _conn;
@@ -97,10 +99,14 @@ public class ProtocolEngine_1_0_0 implem
- public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry, long id)
+ public ProtocolEngine_1_0_0(final NetworkConnection networkDriver, final IApplicationRegistry appRegistry, long id)
{
_appRegistry = appRegistry;
_connectionId = id;
+ if(networkDriver != null)
+ {
+ setNetworkConnection(networkDriver, networkDriver.getSender());
+ }
}
@@ -178,6 +184,7 @@ public class ProtocolEngine_1_0_0 implem
public synchronized void received(ByteBuffer msg)
{
+ _lastReadTime = System.currentTimeMillis();
if(RAW_LOGGER.isLoggable(Level.FINE))
{
ByteBuffer dup = msg.duplicate();
@@ -320,6 +327,7 @@ public class ProtocolEngine_1_0_0 implem
synchronized(_sendLock)
{
+ _lastWriteTime = System.currentTimeMillis();
if(FRAME_LOGGER.isLoggable(Level.FINE))
{
FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
@@ -374,4 +382,13 @@ public class ProtocolEngine_1_0_0 implem
return _connectionId;
}
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
}
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java:r1411034-1415148
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java Fri Nov 30 11:55:29 2012
@@ -51,6 +51,9 @@ public class ProtocolEngine_1_0_0_SASL i
{
private long _readBytes;
private long _writtenBytes;
+
+ private long _lastReadTime;
+ private long _lastWriteTime;
private final IApplicationRegistry _appRegistry;
private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _conn;
@@ -221,6 +224,7 @@ public class ProtocolEngine_1_0_0_SASL i
public synchronized void received(ByteBuffer msg)
{
+ _lastReadTime = System.currentTimeMillis();
if(RAW_LOGGER.isLoggable(Level.FINE))
{
ByteBuffer dup = msg.duplicate();
@@ -363,7 +367,7 @@ public class ProtocolEngine_1_0_0_SASL i
synchronized(_sendLock)
{
-
+ _lastWriteTime = System.currentTimeMillis();
if(FRAME_LOGGER.isLoggable(Level.FINE))
{
FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
@@ -424,4 +428,13 @@ public class ProtocolEngine_1_0_0_SASL i
return _connectionId;
}
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
}
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:r1411034-1415148
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Fri Nov 30 11:55:29 2012
@@ -41,6 +41,14 @@ import org.apache.qpid.server.virtualhos
public class AMQQueueFactory
{
+ public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
+ public static final String X_QPID_CAPACITY = "x-qpid-capacity";
+ public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
+ public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
+ public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
+ public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
+ public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
+
public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
public static final String X_QPID_DESCRIPTION = "x-qpid-description";
public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
@@ -119,42 +127,49 @@ public class AMQQueueFactory
}
private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
- new QueueLongProperty("x-qpid-maximum-message-age")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_AGE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageAge(value);
}
},
- new QueueLongProperty("x-qpid-maximum-message-size")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_SIZE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageSize(value);
}
},
- new QueueLongProperty("x-qpid-maximum-message-count")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_COUNT)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageCount(value);
}
},
- new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+ new QueueLongProperty(X_QPID_MAXIMUM_QUEUE_DEPTH)
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumQueueDepth(value);
+ }
+ },
+ new QueueLongProperty(X_QPID_MINIMUM_ALERT_REPEAT_GAP)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMinimumAlertRepeatGap(value);
}
},
- new QueueLongProperty("x-qpid-capacity")
+ new QueueLongProperty(X_QPID_CAPACITY)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setCapacity(value);
}
},
- new QueueLongProperty("x-qpid-flow-resume-capacity")
+ new QueueLongProperty(X_QPID_FLOW_RESUME_CAPACITY)
{
public void setPropertyValue(AMQQueue queue, long value)
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManager.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManager.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManager.java Fri Nov 30 11:55:29 2012
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.security.Principal;
import java.util.HashMap;
import java.util.Hashtable;
+
+import javax.naming.AuthenticationException;
import javax.naming.Context;
import javax.naming.NamingEnumeration;
import javax.naming.NamingException;
@@ -40,6 +42,7 @@ import javax.security.sasl.SaslException
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.sasl.plain.PlainPasswordCallback;
@@ -119,33 +122,74 @@ public class SimpleLDAPAuthenticationMan
@Override
public AuthenticationResult authenticate(String username, String password)
{
-
try
{
- return doLDAPNameAuthentication(getNameFromId(username), password);
+ AuthenticationResult result = doLDAPNameAuthentication(getNameFromId(username), password);
+ if(result.getStatus() == AuthenticationStatus.SUCCESS)
+ {
+ //Return a result based on the supplied username rather than the search name
+ return new AuthenticationResult(new UsernamePrincipal(username));
+ }
+ else
+ {
+ return result;
+ }
}
catch (NamingException e)
{
-
return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
-
}
}
- private AuthenticationResult doLDAPNameAuthentication(String username, String password) throws NamingException
+ private AuthenticationResult doLDAPNameAuthentication(String name, String password)
{
+ if(name == null)
+ {
+ //The search didn't return anything, class as not-authenticated before it NPEs below
+ return new AuthenticationResult(AuthenticationStatus.CONTINUE);
+ }
+
Hashtable<Object,Object> env = new Hashtable<Object,Object>();
env.put(Context.INITIAL_CONTEXT_FACTORY, _ldapContextFactory);
env.put(Context.PROVIDER_URL, _providerAuthURL);
env.put(Context.SECURITY_AUTHENTICATION, "simple");
- env.put(Context.SECURITY_PRINCIPAL, username);
+ env.put(Context.SECURITY_PRINCIPAL, name);
env.put(Context.SECURITY_CREDENTIALS, password);
- DirContext ctx = new InitialDirContext(env);
- ctx.close();
- return new AuthenticationResult(new UsernamePrincipal(username));
+ DirContext ctx = null;
+ try
+ {
+ ctx = new InitialDirContext(env);
+
+ //Authentication succeeded
+ return new AuthenticationResult(new UsernamePrincipal(name));
+ }
+ catch(AuthenticationException ae)
+ {
+ //Authentication failed
+ return new AuthenticationResult(AuthenticationStatus.CONTINUE);
+ }
+ catch (NamingException e)
+ {
+ //Some other failure
+ return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
+ }
+ finally
+ {
+ if(ctx != null)
+ {
+ try
+ {
+ ctx.close();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Exception closing InitialDirContext", e);
+ }
+ }
+ }
}
@Override
@@ -190,19 +234,11 @@ public class SimpleLDAPAuthenticationMan
}
catch (NamingException e)
{
- _logger.info("SASL Authentication Error", e);
+ _logger.warn("SASL Authentication Exception", e);
}
if(password != null)
{
- try
- {
- authenticated = doLDAPNameAuthentication(name, password);
-
- }
- catch (NamingException e)
- {
- authenticated = new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
- }
+ authenticated = doLDAPNameAuthentication(name, password);
}
}
else if (callback instanceof PlainPasswordCallback)
@@ -210,17 +246,10 @@ public class SimpleLDAPAuthenticationMan
password = ((PlainPasswordCallback)callback).getPlainPassword();
if(name != null)
{
- try
- {
- authenticated = doLDAPNameAuthentication(name, password);
- if(authenticated.getStatus()== AuthenticationResult.AuthenticationStatus.SUCCESS)
- {
- ((PlainPasswordCallback)callback).setAuthenticated(true);
- }
- }
- catch (NamingException e)
+ authenticated = doLDAPNameAuthentication(name, password);
+ if(authenticated.getStatus()== AuthenticationResult.AuthenticationStatus.SUCCESS)
{
- authenticated = new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
+ ((PlainPasswordCallback)callback).setAuthenticated(true);
}
}
}
@@ -242,7 +271,6 @@ public class SimpleLDAPAuthenticationMan
env.put(Context.INITIAL_CONTEXT_FACTORY, _ldapContextFactory);
env.put(Context.PROVIDER_URL, _providerSearchURL);
-
env.put(Context.SECURITY_AUTHENTICATION, "none");
DirContext ctx = null;
@@ -267,7 +295,14 @@ public class SimpleLDAPAuthenticationMan
}
finally
{
- ctx.close();
+ try
+ {
+ ctx.close();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Exception closing InitialDirContext", e);
+ }
}
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Fri Nov 30 11:55:29 2012
@@ -48,6 +48,7 @@ import org.apache.qpid.transport.Executi
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.network.NetworkConnection;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
@@ -68,6 +69,7 @@ public class ServerConnection extends Co
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Principal _peerPrincipal;
+ private NetworkConnection _networkConnection;
public ServerConnection(final long connectionId)
{
@@ -490,4 +492,20 @@ public class ServerConnection extends Co
{
super.setLocalAddress(localAddress);
}
+
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ _networkConnection = network;
+ }
+
+ public NetworkConnection getNetworkConnection()
+ {
+ return _networkConnection;
+ }
+
+ public void doHeartbeat()
+ {
+ super.doHeartBeat();
+
+ }
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Fri Nov 30 11:55:29 2012
@@ -43,6 +43,8 @@ import org.apache.qpid.server.subscripti
import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.NetworkConnection;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -228,14 +230,18 @@ public class ServerConnectionDelegate ex
return;
}
- setConnectionTuneOkChannelMax(sconn, okChannelMax);
- }
+ if(ok.hasHeartbeat())
+ {
+ final int heartbeat = ok.getHeartbeat();
+ if(heartbeat > 0)
+ {
+ final NetworkConnection networkConnection = sconn.getNetworkConnection();
+ networkConnection.setMaxReadIdle(2 * heartbeat);
+ networkConnection.setMaxWriteIdle(heartbeat);
+ }
+ }
- @Override
- protected int getHeartbeatMax()
- {
- //TODO: implement broker support for actually sending heartbeats
- return 0;
+ setConnectionTuneOkChannelMax(sconn, okChannelMax);
}
@Override
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:r1411034-1415148
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java Fri Nov 30 11:55:29 2012
@@ -20,11 +20,31 @@
*/
package org.apache.qpid.server.protocol;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.virtualhost.HouseKeepingTask;
+import org.apache.qpid.server.virtualhost.State;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.TestNetworkConnection;
@@ -42,6 +62,14 @@ public class MultiVersionProtocolEngineF
_appRegistry = new TestApplicationRegistry(new XMLConfiguration());
ApplicationRegistry.initialise(_appRegistry);
+ // AMQP 1-0 connection needs default vhost to be present
+ IApplicationRegistry registry = ApplicationRegistry.getInstance();
+ VirtualHostRegistry virtualHostRegistry = registry.getVirtualHostRegistry();
+ VirtualHostImpl vhostImpl = new VirtualHostImpl(virtualHostRegistry, registry, registry.getSecurityManager(),
+ new VirtualHostConfiguration("default",new XMLConfiguration(), registry.getBroker()));
+ virtualHostRegistry.registerVirtualHost(vhostImpl);
+ virtualHostRegistry.setDefaultVirtualHostName("default");
+
}
protected void tearDown()
@@ -161,6 +189,7 @@ public class MultiVersionProtocolEngineF
assertEquals("ID was not as expected following receipt of the AMQP version header", expectedID, engine.getConnectionId());
previousId = expectedID;
+ engine.closed();
}
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/client.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/client.bnd?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/client.bnd (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/client.bnd Fri Nov 30 11:55:29 2012
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.19.0
+ver: 0.21.0
Bundle-SymbolicName: qpid-client
Bundle-Version: ${ver}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Nov 30 11:55:29 2012
@@ -1560,4 +1560,9 @@ public class AMQConnection extends Close
+ localAddress + " to " + remoteAddress);
}
}
+
+ void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _delegate.setHeartbeatListener(listener);
+ }
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Fri Nov 30 11:55:29 2012
@@ -78,4 +78,6 @@ public interface AMQConnectionDelegate
* @return true if the feature is supported by the server
*/
boolean isSupportedServerFeature(final String featureName);
+
+ void setHeartbeatListener(HeartbeatListener listener);
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Fri Nov 30 11:55:29 2012
@@ -33,6 +33,7 @@ import org.apache.qpid.configuration.Cli
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.Session;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
@@ -214,7 +215,8 @@ public class AMQConnectionDelegate_0_10
+ "********");
}
- ConnectionSettings conSettings = retriveConnectionSettings(brokerDetail);
+ ConnectionSettings conSettings = retrieveConnectionSettings(brokerDetail);
+
_qpidConnection.setConnectionDelegate(new ClientConnectionDelegate(conSettings, _conn.getConnectionURL()));
_qpidConnection.connect(conSettings);
@@ -420,7 +422,13 @@ public class AMQConnectionDelegate_0_10
return featureSupported;
}
- private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail)
+ @Override
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ ((ClientConnectionDelegate)(_qpidConnection.getConnectionDelegate())).setHeartbeatListener(listener);
+ }
+
+ private ConnectionSettings retrieveConnectionSettings(BrokerDetails brokerDetail)
{
ConnectionSettings conSettings = brokerDetail.buildConnectionSettings();
@@ -442,6 +450,24 @@ public class AMQConnectionDelegate_0_10
conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail));
+ //Check connection-level ssl override setting
+ String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL);
+ if(connectionSslOption != null)
+ {
+ boolean connUseSsl = Boolean.parseBoolean(connectionSslOption);
+ boolean brokerlistUseSsl = conSettings.isUseSSL();
+
+ if( connUseSsl != brokerlistUseSsl)
+ {
+ conSettings.setUseSSL(connUseSsl);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Applied connection ssl option override, setting UseSsl to: " + connUseSsl );
+ }
+ }
+ }
+
return conSettings;
}
@@ -464,10 +490,14 @@ public class AMQConnectionDelegate_0_10
heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000;
_logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>");
}
- else
+ else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null)
{
heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
}
+ else
+ {
+ heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT);
+ }
return heartbeat;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Fri Nov 30 11:55:29 2012
@@ -40,6 +40,7 @@ import org.apache.qpid.framing.TxSelectB
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -100,32 +101,30 @@ public class AMQConnectionDelegate_8_0 i
ConnectionSettings settings = brokerDetail.buildConnectionSettings();
settings.setProtocol(brokerDetail.getTransport());
- SSLContext sslContext = null;
- if (settings.isUseSSL())
+ //Check connection-level ssl override setting
+ String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL);
+ if(connectionSslOption != null)
{
- try
- {
- sslContext = SSLContextFactory.buildClientContext(
- settings.getTrustStorePath(),
- settings.getTrustStorePassword(),
- settings.getTrustStoreType(),
- settings.getTrustManagerFactoryAlgorithm(),
- settings.getKeyStorePath(),
- settings.getKeyStorePassword(),
- settings.getKeyStoreType(),
- settings.getKeyManagerFactoryAlgorithm(),
- settings.getCertAlias());
- }
- catch (GeneralSecurityException e)
+ boolean connUseSsl = Boolean.parseBoolean(connectionSslOption);
+ boolean brokerlistUseSsl = settings.isUseSSL();
+
+ if( connUseSsl != brokerlistUseSsl)
{
- throw new AMQException("Unable to create SSLContext: " + e.getMessage(), e);
+ settings.setUseSSL(connUseSsl);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Applied connection ssl option override, setting UseSsl to: " + connUseSsl );
+ }
}
}
SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings);
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
- NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), sslContext);
+
+ NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()),
+ _conn.getProtocolHandler());
_conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender()));
StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
@@ -379,4 +378,10 @@ public class AMQConnectionDelegate_8_0 i
// we just hardcode JMS selectors as supported.
return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
}
+
+ @Override
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _conn.getProtocolHandler().setHeartbeatListener(listener);
+ }
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Fri Nov 30 11:55:29 2012
@@ -196,7 +196,14 @@ public class JMSObjectMessage extends Ab
if (data != null && data.hasRemaining())
{
ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new ByteBufferInputStream(data));
- result = (Serializable) in.readObject();
+ try
+ {
+ result = (Serializable) in.readObject();
+ }
+ finally
+ {
+ in.close();
+ }
}
return result;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Nov 30 11:55:29 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.protocol;
+import org.apache.qpid.client.HeartbeatListener;
import org.apache.qpid.util.BytesDataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,6 +179,9 @@ public class AMQProtocolHandler implemen
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private long _lastReadTime = System.currentTimeMillis();
+ private long _lastWriteTime = System.currentTimeMillis();
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -300,7 +304,6 @@ public class AMQProtocolHandler implemen
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
// failover:
- HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
_network.close();
}
@@ -309,7 +312,7 @@ public class AMQProtocolHandler implemen
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
writeFrame(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
+ _heartbeatListener.heartbeatSent();
}
/**
@@ -442,6 +445,7 @@ public class AMQProtocolHandler implemen
public void received(ByteBuffer msg)
{
_readBytes += msg.remaining();
+ _lastReadTime = System.currentTimeMillis();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -470,8 +474,6 @@ public class AMQProtocolHandler implemen
final AMQBody bodyFrame = frame.getBodyFrame();
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
bodyFrame.handle(frame.getChannel(), _protocolSession);
_connection.bytesReceived(_readBytes);
@@ -560,6 +562,7 @@ public class AMQProtocolHandler implemen
public synchronized void writeFrame(AMQDataBlock frame, boolean flush)
{
final ByteBuffer buf = asByteBuffer(frame);
+ _lastWriteTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
_sender.send(buf);
if(flush)
@@ -882,6 +885,18 @@ public class AMQProtocolHandler implemen
_sender = sender;
}
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
protected Sender<ByteBuffer> getSender()
{
return _sender;
@@ -894,7 +909,6 @@ public class AMQProtocolHandler implemen
{
_network.setMaxWriteIdle(delay);
_network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
@@ -909,5 +923,13 @@ public class AMQProtocolHandler implemen
}
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
+ public void heartbeatBodyReceived()
+ {
+ _heartbeatListener.heartbeatReceived();
+ }
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Nov 30 11:55:29 2012
@@ -267,7 +267,7 @@ public class AMQProtocolSession implemen
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
{
-
+ _protocolHandler.heartbeatBodyReceived();
}
/**
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java Fri Nov 30 11:55:29 2012
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client.transport;
+import org.apache.qpid.client.HeartbeatListener;
+import org.apache.qpid.transport.ConnectionHeartbeat;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
@@ -70,6 +72,7 @@ public class ClientConnectionDelegate ex
}
private final ConnectionURL _connectionURL;
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* @param settings
@@ -165,4 +168,19 @@ public class ClientConnectionDelegate ex
return null;
}
+
+ @Override
+ public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+ {
+ // ClientDelegate simply responds to heartbeats with heartbeats
+ _heartbeatListener.heartbeatReceived();
+ super.connectionHeartbeat(conn, hearbeat);
+ _heartbeatListener.heartbeatSent();
+ }
+
+
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Fri Nov 30 11:55:29 2012
@@ -44,6 +44,13 @@ public interface ConnectionURL
public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
/**
+ * This option is used to apply a connection level override of
+ * the {@value BrokerDetails#OPTIONS_SSL} option values in the
+ * {@value ConnectionURL#OPTIONS_BROKERLIST};
+ */
+ public static final String OPTIONS_SSL = "ssl";
+
+ /**
* This option is only applicable for 0-8/0-9/0-9-1 protocols connection
* <p>
* It tells the client to delegate the requeue/DLQ decision to the
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java Fri Nov 30 11:55:29 2012
@@ -143,4 +143,25 @@ public class BrokerDetailsTest extends T
assertEquals("Unexpected toString", expectedToString, actualToString);
}
+
+ public void testDefaultSsl() throws URLSyntaxException
+ {
+ String brokerURL = "tcp://localhost:5672";
+ AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+
+ assertNull("default value should be null", broker.getProperty(BrokerDetails.OPTIONS_SSL));
+ }
+
+ public void testOverridingSsl() throws URLSyntaxException
+ {
+ String brokerURL = "tcp://localhost:5672?ssl='true'";
+ AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+
+ assertTrue("value should be true", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL)));
+
+ brokerURL = "tcp://localhost:5672?ssl='false''&maxprefetch='1'";
+ broker = new AMQBrokerDetails(brokerURL);
+
+ assertFalse("value should be false", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL)));
+ }
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Fri Nov 30 11:55:29 2012
@@ -30,7 +30,6 @@ import org.apache.qpid.url.URLSyntaxExce
public class ConnectionURLTest extends TestCase
{
-
public void testFailoverURL() throws URLSyntaxException
{
String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin?cyclecount='100''";
@@ -563,5 +562,34 @@ public class ConnectionURLTest extends T
assertNull("Reject behaviour option was not as expected",
connectionurl.getOption(ConnectionURL.OPTIONS_REJECT_BEHAVIOUR));
}
+
+ /**
+ * Verify that when the ssl option is not specified, asking for the option returns null,
+ * such that this can later be used to verify it wasnt specified.
+ */
+ public void testDefaultSsl() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&foo='bar'";
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ assertNull("default ssl value should be null", connectionURL.getOption(ConnectionURL.OPTIONS_SSL));
+ }
+
+ /**
+ * Verify that when the ssl option is specified, asking for the option returns the value,
+ * such that this can later be used to verify what value it was specified as.
+ */
+ public void testOverridingSsl() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&ssl='true'";
+ ConnectionURL connectionURL = new AMQConnectionURL(url);
+
+ assertTrue("value should be true", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_SSL)));
+
+ url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'&ssl='false'";
+ connectionURL = new AMQConnectionURL(url);
+
+ assertFalse("value should be false", Boolean.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_SSL)));
+ }
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common.xml?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common.xml (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common.xml Fri Nov 30 11:55:29 2012
@@ -23,7 +23,10 @@
<dirname property="project.root" file="${ant.file.common}"/>
<property name="project.name" value="qpid"/>
- <property name="project.version" value="0.19"/>
+ <!-- Version used for standard build output -->
+ <property name="project.version" value="0.21"/>
+ <!-- The release version used for maven output. SNAPSHOT added via maven.version.suffix -->
+ <property name="project.version.maven" value="0.22"/>
<property name="project.url" value="http://qpid.apache.org"/>
<property name="project.groupid" value="org.apache.qpid"/>
<property name="project.namever" value="${project.name}-${project.version}"/>
@@ -42,7 +45,7 @@
<property name="build.report" location="${build}/report"/>
<property name="build.release" location="${build}/release"/>
<property name="build.release.prepare" location="${build.release}/prepare"/>
- <property name="build.plugins" location="${build}/lib/plugins"/>
+ <property name="build.lib.broker.plugins" location="${build}/lib/broker-plugins"/>
<property name="build.coverage.report" location="${build}/coverage/report"/>
<property name="build.coverage.src" location="${build}/coverage/src"/>
<property name="build.findbugs" location="${build}/findbugs"/>
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/bin/qpid-run
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/bin/qpid-run?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/bin/qpid-run (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/bin/qpid-run Fri Nov 30 11:55:29 2012
@@ -88,10 +88,10 @@ SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="-DQPI
if [ -n "$QPID_LOG_PREFIX" ]; then
if [ "X$QPID_LOG_PREFIX" = "XPID" ]; then
log $INFO Using pid in qpid log name prefix
- LOG_PREFIX=" -Dlogprefix=$$"
+ LOG_PREFIX="-Dlogprefix=$$"
else
log $INFO Using qpid logprefix property
- LOG_PREFIX=" -Dlogprefix=$QPID_LOG_PREFIX"
+ LOG_PREFIX="-Dlogprefix=$QPID_LOG_PREFIX"
fi
SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="${LOG_PREFIX}"
fi
@@ -99,10 +99,10 @@ fi
if [ -n "$QPID_LOG_SUFFIX" ]; then
if [ "X$QPID_LOG_SUFFIX" = "XPID" ]; then
log $INFO Using pid in qpid log name suffix
- LOG_SUFFIX=" -Dlogsuffix=$$"
+ LOG_SUFFIX="-Dlogsuffix=$$"
else
log $INFO Using qpig logsuffix property
- LOG_SUFFIX=" -Dlogsuffix=$QPID_LOG_SUFFIX"
+ LOG_SUFFIX="-Dlogsuffix=$QPID_LOG_SUFFIX"
fi
SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="${LOG_SUFFIX}"
fi
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/common.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/common.bnd?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/common.bnd (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/common.bnd Fri Nov 30 11:55:29 2012
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.19.0
+ver: 0.21.0
Bundle-SymbolicName: qpid-common
Bundle-Version: ${ver}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Fri Nov 30 11:55:29 2012
@@ -23,6 +23,7 @@ package org.apache.qpid.protocol;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.TransportActivity;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -31,7 +32,7 @@ import java.nio.ByteBuffer;
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
* decodes it and then process the result.
*/
-public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
+public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity
{
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
@@ -56,6 +57,6 @@ public interface ProtocolEngine extends
void readerIdle();
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
}
\ No newline at end of file
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Nov 30 11:55:29 2012
@@ -21,12 +21,7 @@
package org.apache.qpid.transport;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.*;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.network.security.SecurityLayerFactory;
import org.apache.qpid.transport.util.Logger;
@@ -73,6 +68,9 @@ public class Connection extends Connecti
//Usable channels are numbered 0 to <ChannelMax> - 1
public static final int MAX_CHANNEL_MAX = 0xFFFF;
public static final int MIN_USABLE_CHANNEL_NUM = 0;
+ private long _lastSendTime;
+ private long _lastReadTime;
+
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -231,7 +229,8 @@ public class Connection extends Connecti
addConnectionListener((ConnectionListener)secureReceiver);
}
- NetworkConnection network = transport.connect(settings, secureReceiver, null);
+ NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity());
+
setRemoteAddress(network.getRemoteAddress());
setLocalAddress(network.getLocalAddress());
@@ -368,6 +367,7 @@ public class Connection extends Connecti
public void received(ProtocolEvent event)
{
+ _lastReadTime = System.currentTimeMillis();
if(log.isDebugEnabled())
{
log.debug("RECV: [%s] %s", this, event);
@@ -377,6 +377,7 @@ public class Connection extends Connecti
public void send(ProtocolEvent event)
{
+ _lastSendTime = System.currentTimeMillis();
if(log.isDebugEnabled())
{
log.debug("SEND: [%s] %s", this, event);
@@ -745,4 +746,38 @@ public class Connection extends Connecti
sessionDetached.setCode(sessionDetachCode);
invoke(sessionDetached);
}
+
+
+ protected void doHeartBeat()
+ {
+ connectionHeartbeat();
+ }
+
+ private class ConnectionActivity implements TransportActivity
+ {
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastSendTime;
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ connectionHeartbeat();
+ }
+
+ @Override
+ public void readerIdle()
+ {
+ // TODO
+
+ }
+ }
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java Fri Nov 30 11:55:29 2012
@@ -27,5 +27,7 @@ import javax.net.ssl.SSLContext;
public interface IncomingNetworkTransport extends NetworkTransport
{
- public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext);
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext);
}
\ No newline at end of file
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java Fri Nov 30 11:55:29 2012
@@ -50,4 +50,8 @@ public interface NetworkConnection
void setPeerPrincipal(Principal principal);
Principal getPeerPrincipal();
+
+ int getMaxReadIdle();
+
+ int getMaxWriteIdle();
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java Fri Nov 30 11:55:29 2012
@@ -23,12 +23,13 @@ package org.apache.qpid.transport.networ
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
-import javax.net.ssl.SSLContext;
import java.nio.ByteBuffer;
public interface OutgoingNetworkTransport extends NetworkTransport
{
public NetworkConnection getConnection();
- public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext);
+ public NetworkConnection connect(ConnectionSettings settings,
+ Receiver<ByteBuffer> delegate,
+ TransportActivity transportActivity);
}
\ No newline at end of file
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java Fri Nov 30 11:55:29 2012
@@ -26,7 +26,9 @@ import java.nio.ByteBuffer;
import java.security.Principal;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.NetworkConnection;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,14 +40,23 @@ public class IoNetworkConnection impleme
private final IoSender _ioSender;
private final IoReceiver _ioReceiver;
private Principal _principal;
+ private int _maxReadIdle;
+ private int _maxWriteIdle;
public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
- int sendBufferSize, int receiveBufferSize, long timeout)
+ int sendBufferSize, int receiveBufferSize, long timeout)
+ {
+ this(socket,delegate,sendBufferSize,receiveBufferSize,timeout,null);
+ }
+
+ public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
+ int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
{
_socket = socket;
_timeout = timeout;
_ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout);
+ _ioReceiver.setTicker(ticker);
_ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout);
@@ -88,14 +99,12 @@ public class IoNetworkConnection impleme
public void setMaxWriteIdle(int sec)
{
- // TODO implement support for setting heartbeating config in this way
- // Currently a socket timeout is used in IoSender
+ _maxWriteIdle = sec;
}
public void setMaxReadIdle(int sec)
{
- // TODO implement support for setting heartbeating config in this way
- // Currently a socket timeout is used in IoSender
+ _maxReadIdle = sec;
}
@Override
@@ -109,4 +118,16 @@ public class IoNetworkConnection impleme
{
return _principal;
}
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _maxReadIdle;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _maxWriteIdle;
+ }
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Fri Nov 30 11:55:29 2012
@@ -41,9 +41,8 @@ import org.apache.qpid.transport.Connect
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.*;
+
import org.slf4j.LoggerFactory;
public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
@@ -56,7 +55,9 @@ public class IoNetworkTransport implemen
private IoNetworkConnection _connection;
private AcceptingThread _acceptor;
- public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext)
+ public NetworkConnection connect(ConnectionSettings settings,
+ Receiver<ByteBuffer> delegate,
+ TransportActivity transportActivity)
{
int sendBufferSize = settings.getWriteBufferSize();
int receiveBufferSize = settings.getReadBufferSize();
@@ -91,7 +92,9 @@ public class IoNetworkTransport implemen
try
{
- _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT);
+ IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
+ _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+ ticker.setConnection(_connection);
_connection.start();
}
catch(Exception e)
@@ -128,7 +131,9 @@ public class IoNetworkTransport implemen
return _connection;
}
- public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext)
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext)
{
try
{
@@ -149,6 +154,7 @@ public class IoNetworkTransport implemen
private ProtocolEngineFactory _factory;
private SSLContext _sslContext;
private ServerSocket _serverSocket;
+ private int _timeout;
private AcceptingThread(NetworkTransportConfiguration config,
ProtocolEngineFactory factory,
@@ -157,6 +163,7 @@ public class IoNetworkTransport implemen
_config = config;
_factory = factory;
_sslContext = sslContext;
+ _timeout = TIMEOUT;
InetSocketAddress address = config.getAddress();
@@ -217,6 +224,7 @@ public class IoNetworkTransport implemen
{
socket = _serverSocket.accept();
socket.setTcpNoDelay(_config.getTcpNoDelay());
+ socket.setSoTimeout(_timeout);
final Integer sendBufferSize = _config.getSendBufferSize();
final Integer receiveBufferSize = _config.getReceiveBufferSize();
@@ -226,7 +234,10 @@ public class IoNetworkTransport implemen
ProtocolEngine engine = _factory.newProtocolEngine();
- NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, TIMEOUT);
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+ NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
+ ticker);
+ ticker.setConnection(connection);
if(_sslContext != null)
{
@@ -293,6 +304,7 @@ public class IoNetworkTransport implemen
}
}
}
+
}
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Fri Nov 30 11:55:29 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.util.Logger;
import javax.net.ssl.SSLSocket;
@@ -31,6 +32,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,6 +53,8 @@ final class IoReceiver implements Runnab
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread receiverThread;
private static final boolean shutdownBroken;
+
+ private Ticker _ticker;
static
{
String osName = System.getProperty("os.name");
@@ -136,7 +140,7 @@ final class IoReceiver implements Runnab
{
final int threshold = bufferSize / 2;
- // I set the read buffer size simillar to SO_RCVBUF
+ // I set the read buffer size similar to SO_RCVBUF
// Haven't tested with a lower value to see if it's better or worse
byte[] buffer = new byte[bufferSize];
try
@@ -144,17 +148,64 @@ final class IoReceiver implements Runnab
InputStream in = socket.getInputStream();
int read = 0;
int offset = 0;
- while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+ long currentTime;
+ while(read != -1)
{
- if (read > 0)
+ try
{
- ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
- receiver.received(b);
- offset+=read;
- if (offset > threshold)
+ while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
{
- offset = 0;
- buffer = new byte[bufferSize];
+ if (read > 0)
+ {
+ ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
+ receiver.received(b);
+ offset+=read;
+ if (offset > threshold)
+ {
+ offset = 0;
+ buffer = new byte[bufferSize];
+ }
+ }
+ currentTime = System.currentTimeMillis();
+
+ if(_ticker != null)
+ {
+ int tick = _ticker.getTimeToNextTick(currentTime);
+ if(tick <= 0)
+ {
+ tick = _ticker.tick(currentTime);
+ }
+ try
+ {
+ if(!socket.isClosed())
+ {
+ socket.setSoTimeout(tick <= 0 ? 1 : tick);
+ }
+ }
+ catch(SocketException e)
+ {
+ // ignore - closed socket
+ }
+ }
+ }
+ }
+ catch (SocketTimeoutException e)
+ {
+ currentTime = System.currentTimeMillis();
+ if(_ticker != null)
+ {
+ final int tick = _ticker.tick(currentTime);
+ if(!socket.isClosed())
+ {
+ try
+ {
+ socket.setSoTimeout(tick <= 0 ? 1 : tick );
+ }
+ catch(SocketException ex)
+ {
+ // ignore - closed socket
+ }
+ }
}
}
}
@@ -195,4 +246,15 @@ final class IoReceiver implements Runnab
return !brokenClose && !sslSocketClosed;
}
+ public Ticker getTicker()
+ {
+ return _ticker;
+ }
+
+ public void setTicker(Ticker ticker)
+ {
+ _ticker = ticker;
+ }
+
+
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java Fri Nov 30 11:55:29 2012
@@ -83,6 +83,18 @@ public class TestNetworkConnection imple
return null;
}
+ @Override
+ public int getMaxReadIdle()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return 0;
+ }
+
public void setMaxWriteIdle(int idleTime)
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java Fri Nov 30 11:55:29 2012
@@ -128,7 +128,8 @@ public class TransportTest extends QpidT
}
public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate, SSLContext sslContext)
+ Receiver<ByteBuffer> delegate,
+ TransportActivity transportActivity)
{
throw new UnsupportedOperationException();
}
@@ -148,7 +149,7 @@ public class TransportTest extends QpidT
}
public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory, SSLContext sslContext)
+ ProtocolEngineFactory factory, SSLContext sslContext)
{
throw new UnsupportedOperationException();
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/management-common.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/management-common.bnd?rev=1415591&r1=1415590&r2=1415591&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/management-common.bnd (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/management-common.bnd Fri Nov 30 11:55:29 2012
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.19.0
+ver: 0.21.0
Bundle-SymbolicName: qpid-management-common
Bundle-Version: ${ver}
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:r1411034-1415148
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org