You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/21 17:12:33 UTC
svn commit: r387566 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/command/
main/java/org/apache/activemq/openwire/
main/java/org/apache/activemq/transport/
main/java/org/apache/activemq/transport/activeio/ main/java/or...
Author: chirino
Date: Tue Mar 21 08:12:31 2006
New Revision: 387566
URL: http://svn.apache.org/viewcvs?rev=387566&view=rev
Log:
http://jira.activemq.org/jira/browse/AMQ-643
The maxInactivityDuration is now negociated using the WireFormatInfo. This makes it easier to configure connections since client and server configs do not HAVE to match up excactly.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java Tue Mar 21 08:12:31 2006
@@ -240,6 +240,17 @@
public void setTightEncodingEnabled(boolean tightEncodingEnabled) throws IOException {
setProperty("TightEncodingEnabled", tightEncodingEnabled ? Boolean.TRUE : Boolean.FALSE);
}
+
+ /**
+ * @throws IOException
+ */
+ public long getMaxInactivityDuration() throws IOException {
+ Long l = (Long) getProperty("MaxInactivityDuration");
+ return l == null ? 0 : l.longValue();
+ }
+ public void seMaxInactivityDuration(long maxInactivityDuration) throws IOException {
+ setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
+ }
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processWireFormat(this);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java Tue Mar 21 08:12:31 2006
@@ -35,7 +35,8 @@
private boolean cacheEnabled=true;
private boolean tightEncodingEnabled=true;
private boolean sizePrefixDisabled=false;
-
+ private long maxInactivityDuration=30*1000;
+
public WireFormat createWireFormat() {
WireFormatInfo info = new WireFormatInfo();
info.setVersion(version);
@@ -46,6 +47,7 @@
info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
info.setTightEncodingEnabled(tightEncodingEnabled);
info.setSizePrefixDisabled(sizePrefixDisabled);
+ info.seMaxInactivityDuration(maxInactivityDuration);
} catch (Exception e) {
IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
ise.initCause(e);
@@ -104,4 +106,12 @@
public void setSizePrefixDisabled(boolean sizePrefixDisabled) {
this.sizePrefixDisabled = sizePrefixDisabled;
}
+
+ public long getMaxInactivityDuration() {
+ return maxInactivityDuration;
+ }
+
+ public void setMaxInactivityDuration(long maxInactivityDuration) {
+ this.maxInactivityDuration = maxInactivityDuration;
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Tue Mar 21 08:12:31 2006
@@ -20,6 +20,7 @@
import org.apache.activemq.command.Command;
import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.Scheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,12 +32,13 @@
*
* @version $Revision$
*/
-public class InactivityMonitor extends TransportFilter implements Runnable {
+public class InactivityMonitor extends TransportFilter {
private final Log log = LogFactory.getLog(InactivityMonitor.class);
- private final long maxInactivityDuration;
- private byte readCheckIteration=0;
+ private WireFormatInfo localWireFormatInfo;
+ private WireFormatInfo remoteWireFormatInfo;
+ private boolean monitorStarted=false;
private final AtomicBoolean commandSent=new AtomicBoolean(false);
private final AtomicBoolean inSend=new AtomicBoolean(false);
@@ -44,35 +46,29 @@
private final AtomicBoolean commandReceived=new AtomicBoolean(true);
private final AtomicBoolean inReceive=new AtomicBoolean(false);
- public InactivityMonitor(Transport next, long maxInactivityDuration) {
+ private final Runnable readChecker = new Runnable() {
+ public void run() {
+ readCheck();
+ }
+ };
+
+ private final Runnable writeChecker = new Runnable() {
+ public void run() {
+ writeCheck();
+ }
+ };
+
+
+ public InactivityMonitor(Transport next) {
super(next);
- this.maxInactivityDuration = maxInactivityDuration;
- }
-
- public void start() throws Exception {
- next.start();
- Scheduler.executePeriodically(this, maxInactivityDuration/2);
}
-
+
public void stop() throws Exception {
- Scheduler.cancel(this);
+ stopMonitorThreads();
next.stop();
}
-
- synchronized public void run() {
- switch(readCheckIteration) {
- case 0:
- writeCheck();
- readCheckIteration++;
- break;
- case 1:
- readCheck();
- writeCheck();
- readCheckIteration=0;
- break;
- }
- }
-
+
+
private void writeCheck() {
if( inSend.get() ) {
log.debug("A send is in progress");
@@ -82,7 +78,7 @@
if( !commandSent.get() ) {
log.debug("No message sent since last write check, sending a KeepAliveInfo");
try {
- next.oneway(new KeepAliveInfo());
+ next.oneway(new KeepAliveInfo());
} catch (IOException e) {
onException(e);
}
@@ -113,18 +109,35 @@
public void onCommand(Command command) {
inReceive.set(true);
try {
+ if( command.isWireFormatInfo() ) {
+ synchronized( this ) {
+ remoteWireFormatInfo = (WireFormatInfo) command;
+ try {
+ startMonitorThreads();
+ } catch (IOException e) {
+ onException(e);
+ }
+ }
+ }
getTransportListener().onCommand(command);
} finally {
inReceive.set(false);
commandReceived.set(true);
}
}
+
public void oneway(Command command) throws IOException {
// Disable inactivity monitoring while processing a command.
inSend.set(true);
commandSent.set(true);
try {
+ if( command.isWireFormatInfo() ) {
+ synchronized( this ) {
+ localWireFormatInfo = (WireFormatInfo) command;
+ startMonitorThreads();
+ }
+ }
next.oneway(command);
} finally {
inSend.set(false);
@@ -132,7 +145,37 @@
}
public void onException(IOException error) {
- Scheduler.cancel(this);
+ stopMonitorThreads();
getTransportListener().onException(error);
}
+
+
+ synchronized private void startMonitorThreads() throws IOException {
+ if( monitorStarted )
+ return;
+ if( localWireFormatInfo == null )
+ return;
+ if( remoteWireFormatInfo == null )
+ return;
+
+ long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
+ if( l > 0 ) {
+ Scheduler.executePeriodically(writeChecker, l/2);
+ Scheduler.executePeriodically(readChecker, l);
+ monitorStarted=true;
+ }
+ }
+
+ /**
+ *
+ */
+ synchronized private void stopMonitorThreads() {
+ if( monitorStarted ) {
+ Scheduler.cancel(readChecker);
+ Scheduler.cancel(writeChecker);
+ monitorStarted=false;
+ }
+ }
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java Tue Mar 21 08:12:31 2006
@@ -230,11 +230,13 @@
if( activeIOTransport.isTrace() ) {
transport = new TransportLogger(transport);
}
+
+ transport = new InactivityMonitor(transport);
+
if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat) format, activeIOTransport.getMinmumWireFormatVersion());
}
- transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration());
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
@@ -279,10 +281,12 @@
if( activeIOTransport.isTrace() ) {
transport = new TransportLogger(transport);
}
+
+ transport = new InactivityMonitor(transport);
+
if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat) format, activeIOTransport.getMinmumWireFormatVersion());
}
- transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration());
return transport;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Tue Mar 21 08:12:31 2006
@@ -56,7 +56,6 @@
private boolean trace;
private boolean useLocalHost = true;
private int minmumWireFormatVersion;
- private long maxInactivityDuration = 0; //30000;
private InetSocketAddress socketAddress;
@@ -204,17 +203,6 @@
*/
public void setSoTimeout(int soTimeout) {
this.soTimeout = soTimeout;
- }
-
- public long getMaxInactivityDuration() {
- return maxInactivityDuration;
- }
-
- /**
- * Sets the maximum inactivity duration
- */
- public void setMaxInactivityDuration(long maxInactivityDuration) {
- this.maxInactivityDuration = maxInactivityDuration;
}
public int getConnectionTimeout() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Tue Mar 21 08:12:31 2006
@@ -64,15 +64,13 @@
transport = new TransportLogger(transport);
}
+ transport = new InactivityMonitor(transport);
+
// Only need the OpenWireFormat if using openwire
if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
}
- if( tcpTransport.getMaxInactivityDuration() > 0 ) {
- transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
- }
-
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
@@ -85,14 +83,13 @@
transport = new TransportLogger(transport);
}
+ transport = new InactivityMonitor(transport);
+
// Only need the OpenWireFormat if using openwire
if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
}
- if( tcpTransport.getMaxInactivityDuration() > 0 ) {
- transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
- }
return transport;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Tue Mar 21 08:12:31 2006
@@ -48,7 +48,7 @@
private int backlog = 5000;
private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
private TcpTransportFactory transportFactory = new TcpTransportFactory();
- private long maxInactivityDuration = 0; //30000;
+ private long maxInactivityDuration = 30000;
private int minmumWireFormatVersion;
private boolean trace;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Tue Mar 21 08:12:31 2006
@@ -57,7 +57,6 @@
private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy();
private ReplayBuffer replayBuffer;
private int datagramSize = 4 * 1024;
- private long maxInactivityDuration = 0; // 30000;
private SocketAddress targetAddress;
private SocketAddress originalTargetAddress;
private DatagramChannel channel;
@@ -223,23 +222,12 @@
this.trace = trace;
}
- public long getMaxInactivityDuration() {
- return maxInactivityDuration;
- }
-
public int getDatagramSize() {
return datagramSize;
}
public void setDatagramSize(int datagramSize) {
this.datagramSize = datagramSize;
- }
-
- /**
- * Sets the maximum inactivity duration
- */
- public void setMaxInactivityDuration(long maxInactivityDuration) {
- this.maxInactivityDuration = maxInactivityDuration;
}
public boolean isUseLocalHost() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java Tue Mar 21 08:12:31 2006
@@ -81,14 +81,12 @@
transport = new TransportLogger(transport);
}
+ transport = new InactivityMonitor(transport);
+
if (format instanceof OpenWireFormat) {
transport = configureClientSideNegotiator(transport, format, udpTransport);
}
- if (udpTransport.getMaxInactivityDuration() > 0) {
- transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
- }
-
return transport;
}
@@ -115,12 +113,10 @@
transport = new TransportLogger(transport);
}
+ transport = new InactivityMonitor(transport);
+
if (!acceptServer && format instanceof OpenWireFormat) {
transport = configureClientSideNegotiator(transport, format, udpTransport);
- }
-
- if (udpTransport.getMaxInactivityDuration() > 0) {
- transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
}
// deal with fragmentation
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java Tue Mar 21 08:12:31 2006
@@ -134,10 +134,7 @@
}
protected Transport configureTransport(Transport transport) {
- if (serverTransport.getMaxInactivityDuration() > 0) {
- transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
- }
-
+ transport = new InactivityMonitor(transport);
getAcceptListener().onAccept(transport);
return transport;
}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java Tue Mar 21 08:12:31 2006
@@ -1,10 +1,28 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.transport.tcp;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
@@ -32,16 +50,17 @@
public Runnable serverRunOnCommand;
public Runnable clientRunOnCommand;
- public long clientInactivityLimit;
- public long serverInactivityLimit;
-
-
protected void setUp() throws Exception {
super.setUp();
- server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?trace=true&maxInactivityDuration="+serverInactivityLimit));
- server.setAcceptListener(this);
- server.start();
- clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?trace=true&maxInactivityDuration="+clientInactivityLimit));
+ startTransportServer();
+ }
+
+ /**
+ * @throws Exception
+ * @throws URISyntaxException
+ */
+ private void startClient() throws Exception, URISyntaxException {
+ clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000"));
clientTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
clientReceiveCount.incrementAndGet();
@@ -62,18 +81,37 @@
}});
clientTransport.start();
}
+
+ /**
+ * @throws IOException
+ * @throws URISyntaxException
+ * @throws Exception
+ */
+ private void startTransportServer() throws IOException, URISyntaxException, Exception {
+ server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000"));
+ server.setAcceptListener(this);
+ server.start();
+ }
protected void tearDown() throws Exception {
ignoreClientError.set(true);
ignoreServerError.set(true);
- clientTransport.stop();
- serverTransport.stop();
- server.stop();
+ try {
+ if( clientTransport!=null )
+ clientTransport.stop();
+ if( serverTransport!=null )
+ serverTransport.stop();
+ if( server!=null )
+ server.stop();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
super.tearDown();
}
public void onAccept(Transport transport) {
try {
+ System.out.println("["+getName()+"] Server Accepted a Connection");
serverTransport = transport;
serverTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
@@ -103,12 +141,35 @@
error.printStackTrace();
}
- public void initCombosForTestClientHang() {
- addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000*60)});
- addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
- }
public void testClientHang() throws Exception {
+ //
+ // Manually create a client transport so that it does not send KeepAlive packets.
+ // this should simulate a client hang.
+ clientTransport = new TcpTransport(new OpenWireFormat(), new URI("tcp://localhost:61616"));
+ clientTransport.setTransportListener(new TransportListener() {
+ public void onCommand(Command command) {
+ clientReceiveCount.incrementAndGet();
+ if( clientRunOnCommand !=null ) {
+ clientRunOnCommand.run();
+ }
+ }
+ public void onException(IOException error) {
+ if( !ignoreClientError.get() ) {
+ System.out.println("Client transport error:");
+ error.printStackTrace();
+ clientErrorCount.incrementAndGet();
+ }
+ }
+ public void transportInterupted() {
+ }
+ public void transportResumed() {
+ }});
+ clientTransport.start();
+ WireFormatInfo info = new WireFormatInfo();
+ info.seMaxInactivityDuration(1000);
+ clientTransport.oneway(info);
+
assertEquals(0, serverErrorCount.get());
assertEquals(0, clientErrorCount.get());
@@ -119,42 +180,45 @@
assertTrue(serverErrorCount.get()>0);
}
- public void initCombosForTestNoClientHang() {
- addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)});
- addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
- }
public void testNoClientHang() throws Exception {
+ startClient();
assertEquals(0, serverErrorCount.get());
assertEquals(0, clientErrorCount.get());
Thread.sleep(4000);
- if( clientErrorCount.get() > 0 )
- assertEquals(0, clientErrorCount.get());
- if( serverErrorCount.get() > 0 )
- assertEquals(0, serverErrorCount.get());
+ assertEquals(0, clientErrorCount.get());
+ assertEquals(0, serverErrorCount.get());
}
/**
* Used to test when a operation blocks. This should
* not cause transport to get disconnected.
+ * @throws Exception
+ * @throws URISyntaxException
*/
- public void initCombosForTestNoClientHangWithServerBlock() {
+ public void initCombosForTestNoClientHangWithServerBlock() throws Exception {
+
+ startClient();
+
addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)});
addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
addCombinationValues("serverRunOnCommand", new Object[] { new Runnable() {
public void run() {
try {
System.out.println("Sleeping");
- Thread.sleep(2000);
+ Thread.sleep(4000);
} catch (InterruptedException e) {
}
}
}});
}
+
public void testNoClientHangWithServerBlock() throws Exception {
+ startClient();
+
assertEquals(0, serverErrorCount.get());
assertEquals(0, clientErrorCount.get());