You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/14 14:09:57 UTC

svn commit: r1058997 - in /activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp: SslTransport.java TcpTransport.java

Author: chirino
Date: Fri Jan 14 13:09:56 2011
New Revision: 1058997

URL: http://svn.apache.org/viewvc?rev=1058997&view=rev
Log:
Getting ssl and the rate limited tcp bits working better.

Modified:
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java?rev=1058997&r1=1058996&r2=1058997&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java Fri Jan 14 13:09:56 2011
@@ -14,11 +14,10 @@ import java.nio.channels.WritableByteCha
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_WRAP;
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
 import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW;
 
 /**
@@ -113,19 +112,23 @@ public class SslTransport extends TcpTra
         super.onConnected();
         engine.setWantClientAuth(true);
         engine.beginHandshake();
-        handshake_done();
+        handshake();
     }
 
     @Override
     protected void drainOutbound() {
-        if ( handshake_done() ) {
+        if ( engine.getHandshakeStatus()!=NOT_HANDSHAKING ) {
+            handshake();
+        } else {
             super.drainOutbound();
         }
     }
 
     @Override
     protected void drainInbound() {
-        if ( handshake_done() ) {
+        if ( engine.getHandshakeStatus()!=NOT_HANDSHAKING ) {
+            handshake();
+        } else {
             super.drainInbound();
         }
     }
@@ -137,10 +140,11 @@ public class SslTransport extends TcpTra
     protected boolean flush() throws IOException {
         while (true) {
             if(writeFlushing) {
-                super.writeChannel().write(writeBuffer);
+                int count = super.writeChannel().write(writeBuffer);
                 if( !writeBuffer.hasRemaining() ) {
                     writeBuffer.clear();
                     writeFlushing = false;
+                    suspendWrite();
                     return true;
                 } else {
                     return false;
@@ -149,6 +153,7 @@ public class SslTransport extends TcpTra
                 if( writeBuffer.position()!=0 ) {
                     writeBuffer.flip();
                     writeFlushing = true;
+                    resumeWrite();
                 } else {
                     return true;
                 }
@@ -170,6 +175,13 @@ public class SslTransport extends TcpTra
                 break;
             }
         }
+        if( plain.remaining()==0 && engine.getHandshakeStatus()!=NOT_HANDSHAKING ) {
+            dispatchQueue.execute(new Runnable() {
+                public void run() {
+                    handshake();
+                }
+            });
+        }
         return rc;
     }
 
@@ -227,6 +239,13 @@ public class SslTransport extends TcpTra
                             return rc;
                         }
                     case OK:
+                        if ( engine.getHandshakeStatus()!=NOT_HANDSHAKING ) {
+                            dispatchQueue.execute(new Runnable() {
+                                public void run() {
+                                    handshake();
+                                }
+                            });
+                        }
                         break;
                     case BUFFER_UNDERFLOW:
                         readBuffer.compact();
@@ -240,8 +259,11 @@ public class SslTransport extends TcpTra
         return rc;
     }
 
-    public boolean handshake_done() {
-        while (true) {
+    public void handshake() {
+        try {
+            if( !flush() ) {
+                return;
+            }
             switch (engine.getHandshakeStatus()) {
                 case NEED_TASK:
                     final Runnable task = engine.getDelegatedTask();
@@ -252,50 +274,33 @@ public class SslTransport extends TcpTra
                                 dispatchQueue.execute(new Runnable() {
                                     public void run() {
                                         if (isConnected()) {
-                                            handshake_done();
+                                            handshake();
                                         }
                                     }
                                 });
                             }
                         });
-                        return false;
                     }
                     break;
 
                 case NEED_WRAP:
-                    try {
-                        secure_write(ByteBuffer.allocate(0));
-                        if( writeFlushing && writeSource.isSuspended() ) {
-                            writeSource.resume();
-                            return false;
-                        }
-                    } catch(IOException e) {
-                        onTransportFailure(e);
-                    }
+                    secure_write(ByteBuffer.allocate(0));
                     break;
 
                 case NEED_UNWRAP:
-                    try {
-                        secure_read(ByteBuffer.allocate(0));
-                        if( readUnderflow && readSource.isSuspended() ) {
-                            readSource.resume();
-                            return false;
-                        }
-                    } catch(IOException e) {
-                        onTransportFailure(e);
-                        return true;
-                    }
+                    secure_read(ByteBuffer.allocate(0));
                     break;
 
                 case FINISHED:
-
                 case NOT_HANDSHAKING:
-                    return true;
+                    break;
 
                 default:
-                    SSLEngineResult.HandshakeStatus status = engine.getHandshakeStatus();
-                    System.out.println("Unexpected ssl engine handshake status: "+ status);
+                    System.err.println("Unexpected ssl engine handshake status: "+ engine.getHandshakeStatus());
+                    break;
             }
+        } catch (IOException e ) {
+            onTransportFailure(e);
         }
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1058997&r1=1058996&r2=1058997&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java Fri Jan 14 13:09:56 2011
@@ -173,8 +173,8 @@ public class TcpTransport extends JavaBa
     protected SocketState socketState = new DISCONNECTED();
 
     protected DispatchQueue dispatchQueue;
-    protected DispatchSource readSource;
-    protected DispatchSource writeSource;
+    private DispatchSource readSource;
+    private DispatchSource writeSource;
 
     protected boolean useLocalHost = true;
     protected boolean full = false;
@@ -186,11 +186,27 @@ public class TcpTransport extends JavaBa
     class RateLimitingChannel implements ReadableByteChannel, WritableByteChannel {
 
         int read_allowance = max_read_rate;
+        boolean read_suspended = false;
+        int read_resume_counter = 0;
         int write_allowance = max_write_rate;
+        boolean write_suspended = false;
 
         public void resetAllowance() {
-            read_allowance = max_read_rate;
-            write_allowance = max_write_rate;
+            if( read_allowance != max_read_rate || write_allowance != max_write_rate) {
+                read_allowance = max_read_rate;
+                write_allowance = max_write_rate;
+                if( write_suspended ) {
+                    write_suspended = false;
+                    resumeWrite();
+                }
+                if( read_suspended ) {
+                    read_suspended = false;
+                    resumeRead();
+                    for( int i=0; i < read_resume_counter ; i++ ) {
+                        resumeRead();
+                    }
+                }
+            }
         }
 
         public int read(ByteBuffer dst) throws IOException {
@@ -207,13 +223,22 @@ public class TcpTransport extends JavaBa
                     reduction = remaining - read_allowance;
                     dst.limit(dst.limit() - reduction);
                 }
+                int rc=0;
                 try {
-                    return channel.read(dst);
+                    rc = channel.read(dst);
+                    read_allowance -= rc;
                 } finally {
                     if( reduction!=0 ) {
+                        if( dst.remaining() == 0 ) {
+                            // we need to suspend the read now until we get
+                            // a new allowance..
+                            readSource.suspend();
+                            read_suspended = true;
+                        }
                         dst.limit(dst.limit() + reduction);
                     }
                 }
+                return rc;
             }
         }
 
@@ -231,13 +256,22 @@ public class TcpTransport extends JavaBa
                     reduction = remaining - write_allowance;
                     src.limit(src.limit() - reduction);
                 }
+                int rc = 0;
                 try {
-                    return channel.write(src);
+                    rc = channel.write(src);
+                    write_allowance -= rc;
                 } finally {
                     if( reduction!=0 ) {
+                        if( src.remaining() == 0 ) {
+                            // we need to suspend the read now until we get
+                            // a new allowance..
+                            write_suspended = true;
+                            suspendWrite();
+                        }
                         src.limit(src.limit() + reduction);
                     }
                 }
+                return rc;
             }
         }
 
@@ -249,6 +283,14 @@ public class TcpTransport extends JavaBa
             channel.close();
         }
 
+        public void resumeRead() {
+            if( read_suspended ) {
+                read_resume_counter += 1;
+            } else {
+                _resumeRead();
+            }
+        }
+
     }
 
     private final Runnable CANCEL_HANDLER = new Runnable() {
@@ -345,6 +387,7 @@ public class TcpTransport extends JavaBa
                 });
                 readSource.setCancelHandler(CANCEL_HANDLER);
                 readSource.resume();
+
             } else if (socketState.is(CONNECTED.class) ) {
                 dispatchQueue.dispatchAsync(new Runnable() {
                     public void run() {
@@ -357,7 +400,7 @@ public class TcpTransport extends JavaBa
                     }
                 });
             } else {
-                System.err.println("cannot be started.  socket state is: "+socketState); 
+                System.err.println("cannot be started.  socket state is: "+socketState);
             }
         } finally {
             if( onCompleted!=null ) {
@@ -461,9 +504,11 @@ public class TcpTransport extends JavaBa
             switch (rc ) {
                 case FULL:
                     return false;
-                case WAS_EMPTY:
-                    writeSource.resume();
                 default:
+                    if( drained ) {
+                        drained = false;
+                        resumeWrite();
+                    }
                     return true;
             }
         } catch (IOException e) {
@@ -473,6 +518,8 @@ public class TcpTransport extends JavaBa
 
     }
 
+
+    boolean drained = true;
     /**
      *
      */
@@ -483,8 +530,11 @@ public class TcpTransport extends JavaBa
         }
         try {
             if( codec.flush() == ProtocolCodec.BufferState.EMPTY && flush() ) {
-                writeSource.suspend();
-                listener.onRefill();
+                if( !drained ) {
+                    drained = true;
+                    suspendWrite();
+                    listener.onRefill();
+                }
             }
         } catch (IOException e) {
             onTransportFailure(e);
@@ -559,10 +609,33 @@ public class TcpTransport extends JavaBa
 
     public void resumeRead() {
         if( isConnected() && readSource!=null ) {
-            readSource.resume();
+            if( rateLimitingChannel!=null ) {
+                rateLimitingChannel.resumeRead();
+            } else {
+                _resumeRead();
+            }
+        }
+    }
+    private void _resumeRead() {
+        readSource.resume();
+        dispatchQueue.execute(new Runnable(){
+            public void run() {
+                drainInbound();
+            }
+        });
+    }
+
+    protected void suspendWrite() {
+        if( isConnected() && writeSource!=null ) {
+            writeSource.suspend();
+        }
+    }
+    protected void resumeWrite() {
+        if( isConnected() && writeSource!=null ) {
+            writeSource.resume();
             dispatchQueue.execute(new Runnable(){
                 public void run() {
-                    drainInbound();
+                    drainOutbound();
                 }
             });
         }