You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/14 14:09:57 UTC
svn commit: r1058997 - in
/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp:
SslTransport.java TcpTransport.java
Author: chirino
Date: Fri Jan 14 13:09:56 2011
New Revision: 1058997
URL: http://svn.apache.org/viewvc?rev=1058997&view=rev
Log:
Getting ssl and the rate limited tcp bits working better.
Modified:
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java?rev=1058997&r1=1058996&r2=1058997&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java Fri Jan 14 13:09:56 2011
@@ -14,11 +14,10 @@ import java.nio.channels.WritableByteCha
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_WRAP;
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW;
/**
@@ -113,19 +112,23 @@ public class SslTransport extends TcpTra
super.onConnected();
engine.setWantClientAuth(true);
engine.beginHandshake();
- handshake_done();
+ handshake();
}
@Override
protected void drainOutbound() {
- if ( handshake_done() ) {
+ if ( engine.getHandshakeStatus()!=NOT_HANDSHAKING ) {
+ handshake();
+ } else {
super.drainOutbound();
}
}
@Override
protected void drainInbound() {
- if ( handshake_done() ) {
+ if ( engine.getHandshakeStatus()!=NOT_HANDSHAKING ) {
+ handshake();
+ } else {
super.drainInbound();
}
}
@@ -137,10 +140,11 @@ public class SslTransport extends TcpTra
protected boolean flush() throws IOException {
while (true) {
if(writeFlushing) {
- super.writeChannel().write(writeBuffer);
+ int count = super.writeChannel().write(writeBuffer);
if( !writeBuffer.hasRemaining() ) {
writeBuffer.clear();
writeFlushing = false;
+ suspendWrite();
return true;
} else {
return false;
@@ -149,6 +153,7 @@ public class SslTransport extends TcpTra
if( writeBuffer.position()!=0 ) {
writeBuffer.flip();
writeFlushing = true;
+ resumeWrite();
} else {
return true;
}
@@ -170,6 +175,13 @@ public class SslTransport extends TcpTra
break;
}
}
+ if( plain.remaining()==0 && engine.getHandshakeStatus()!=NOT_HANDSHAKING ) {
+ dispatchQueue.execute(new Runnable() {
+ public void run() {
+ handshake();
+ }
+ });
+ }
return rc;
}
@@ -227,6 +239,13 @@ public class SslTransport extends TcpTra
return rc;
}
case OK:
+ if ( engine.getHandshakeStatus()!=NOT_HANDSHAKING ) {
+ dispatchQueue.execute(new Runnable() {
+ public void run() {
+ handshake();
+ }
+ });
+ }
break;
case BUFFER_UNDERFLOW:
readBuffer.compact();
@@ -240,8 +259,11 @@ public class SslTransport extends TcpTra
return rc;
}
- public boolean handshake_done() {
- while (true) {
+ public void handshake() {
+ try {
+ if( !flush() ) {
+ return;
+ }
switch (engine.getHandshakeStatus()) {
case NEED_TASK:
final Runnable task = engine.getDelegatedTask();
@@ -252,50 +274,33 @@ public class SslTransport extends TcpTra
dispatchQueue.execute(new Runnable() {
public void run() {
if (isConnected()) {
- handshake_done();
+ handshake();
}
}
});
}
});
- return false;
}
break;
case NEED_WRAP:
- try {
- secure_write(ByteBuffer.allocate(0));
- if( writeFlushing && writeSource.isSuspended() ) {
- writeSource.resume();
- return false;
- }
- } catch(IOException e) {
- onTransportFailure(e);
- }
+ secure_write(ByteBuffer.allocate(0));
break;
case NEED_UNWRAP:
- try {
- secure_read(ByteBuffer.allocate(0));
- if( readUnderflow && readSource.isSuspended() ) {
- readSource.resume();
- return false;
- }
- } catch(IOException e) {
- onTransportFailure(e);
- return true;
- }
+ secure_read(ByteBuffer.allocate(0));
break;
case FINISHED:
-
case NOT_HANDSHAKING:
- return true;
+ break;
default:
- SSLEngineResult.HandshakeStatus status = engine.getHandshakeStatus();
- System.out.println("Unexpected ssl engine handshake status: "+ status);
+ System.err.println("Unexpected ssl engine handshake status: "+ engine.getHandshakeStatus());
+ break;
}
+ } catch (IOException e ) {
+ onTransportFailure(e);
}
}
Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1058997&r1=1058996&r2=1058997&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java Fri Jan 14 13:09:56 2011
@@ -173,8 +173,8 @@ public class TcpTransport extends JavaBa
protected SocketState socketState = new DISCONNECTED();
protected DispatchQueue dispatchQueue;
- protected DispatchSource readSource;
- protected DispatchSource writeSource;
+ private DispatchSource readSource;
+ private DispatchSource writeSource;
protected boolean useLocalHost = true;
protected boolean full = false;
@@ -186,11 +186,27 @@ public class TcpTransport extends JavaBa
class RateLimitingChannel implements ReadableByteChannel, WritableByteChannel {
int read_allowance = max_read_rate;
+ boolean read_suspended = false;
+ int read_resume_counter = 0;
int write_allowance = max_write_rate;
+ boolean write_suspended = false;
public void resetAllowance() {
- read_allowance = max_read_rate;
- write_allowance = max_write_rate;
+ if( read_allowance != max_read_rate || write_allowance != max_write_rate) {
+ read_allowance = max_read_rate;
+ write_allowance = max_write_rate;
+ if( write_suspended ) {
+ write_suspended = false;
+ resumeWrite();
+ }
+ if( read_suspended ) {
+ read_suspended = false;
+ resumeRead();
+ for( int i=0; i < read_resume_counter ; i++ ) {
+ resumeRead();
+ }
+ }
+ }
}
public int read(ByteBuffer dst) throws IOException {
@@ -207,13 +223,22 @@ public class TcpTransport extends JavaBa
reduction = remaining - read_allowance;
dst.limit(dst.limit() - reduction);
}
+ int rc=0;
try {
- return channel.read(dst);
+ rc = channel.read(dst);
+ read_allowance -= rc;
} finally {
if( reduction!=0 ) {
+ if( dst.remaining() == 0 ) {
+ // we need to suspend the read now until we get
+ // a new allowance..
+ readSource.suspend();
+ read_suspended = true;
+ }
dst.limit(dst.limit() + reduction);
}
}
+ return rc;
}
}
@@ -231,13 +256,22 @@ public class TcpTransport extends JavaBa
reduction = remaining - write_allowance;
src.limit(src.limit() - reduction);
}
+ int rc = 0;
try {
- return channel.write(src);
+ rc = channel.write(src);
+ write_allowance -= rc;
} finally {
if( reduction!=0 ) {
+ if( src.remaining() == 0 ) {
+ // we need to suspend the read now until we get
+ // a new allowance..
+ write_suspended = true;
+ suspendWrite();
+ }
src.limit(src.limit() + reduction);
}
}
+ return rc;
}
}
@@ -249,6 +283,14 @@ public class TcpTransport extends JavaBa
channel.close();
}
+ public void resumeRead() {
+ if( read_suspended ) {
+ read_resume_counter += 1;
+ } else {
+ _resumeRead();
+ }
+ }
+
}
private final Runnable CANCEL_HANDLER = new Runnable() {
@@ -345,6 +387,7 @@ public class TcpTransport extends JavaBa
});
readSource.setCancelHandler(CANCEL_HANDLER);
readSource.resume();
+
} else if (socketState.is(CONNECTED.class) ) {
dispatchQueue.dispatchAsync(new Runnable() {
public void run() {
@@ -357,7 +400,7 @@ public class TcpTransport extends JavaBa
}
});
} else {
- System.err.println("cannot be started. socket state is: "+socketState);
+ System.err.println("cannot be started. socket state is: "+socketState);
}
} finally {
if( onCompleted!=null ) {
@@ -461,9 +504,11 @@ public class TcpTransport extends JavaBa
switch (rc ) {
case FULL:
return false;
- case WAS_EMPTY:
- writeSource.resume();
default:
+ if( drained ) {
+ drained = false;
+ resumeWrite();
+ }
return true;
}
} catch (IOException e) {
@@ -473,6 +518,8 @@ public class TcpTransport extends JavaBa
}
+
+ boolean drained = true;
/**
*
*/
@@ -483,8 +530,11 @@ public class TcpTransport extends JavaBa
}
try {
if( codec.flush() == ProtocolCodec.BufferState.EMPTY && flush() ) {
- writeSource.suspend();
- listener.onRefill();
+ if( !drained ) {
+ drained = true;
+ suspendWrite();
+ listener.onRefill();
+ }
}
} catch (IOException e) {
onTransportFailure(e);
@@ -559,10 +609,33 @@ public class TcpTransport extends JavaBa
public void resumeRead() {
if( isConnected() && readSource!=null ) {
- readSource.resume();
+ if( rateLimitingChannel!=null ) {
+ rateLimitingChannel.resumeRead();
+ } else {
+ _resumeRead();
+ }
+ }
+ }
+ private void _resumeRead() {
+ readSource.resume();
+ dispatchQueue.execute(new Runnable(){
+ public void run() {
+ drainInbound();
+ }
+ });
+ }
+
+ protected void suspendWrite() {
+ if( isConnected() && writeSource!=null ) {
+ writeSource.suspend();
+ }
+ }
+ protected void resumeWrite() {
+ if( isConnected() && writeSource!=null ) {
+ writeSource.resume();
dispatchQueue.execute(new Runnable(){
public void run() {
- drainInbound();
+ drainOutbound();
}
});
}