You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2013/01/14 23:24:10 UTC

svn commit: r1433175 - in /tomcat/trunk/java/org/apache: coyote/http11/upgrade/ tomcat/util/net/

Author: markt
Date: Mon Jan 14 22:24:09 2013
New Revision: 1433175

URL: http://svn.apache.org/viewvc?rev=1433175&view=rev
Log:
First pass at getting HTTP upgrade working for APR/native. I'm testing this with WebSocket. Autobahn doesn't trigger a crash :) but there are a number of failures I still need to investigate :(

Modified:
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletInputStream.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletInputStream.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java?rev=1433175&r1=1433174&r2=1433175&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java Mon Jan 14 22:24:09 2013
@@ -89,7 +89,8 @@ public abstract class AbstractProcessor<
             // Unexpected state
             return SocketState.CLOSED;
         }
-        if (upgradeServletInputStream.isCloseRequired()) {
+        if (upgradeServletInputStream.isCloseRequired() ||
+                upgradeServletOutputStream.isCloseRequired()) {
             return SocketState.CLOSED;
         }
         return SocketState.UPGRADED;

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletInputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletInputStream.java?rev=1433175&r1=1433174&r2=1433175&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletInputStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletInputStream.java Mon Jan 14 22:24:09 2013
@@ -28,7 +28,7 @@ public abstract class AbstractServletInp
     protected static final StringManager sm =
             StringManager.getManager(Constants.Package);
 
-    private boolean closeRequired = false;
+    private volatile boolean closeRequired = false;
     // Start in blocking-mode
     private volatile Boolean ready = Boolean.TRUE;
     private volatile ReadListener listener = null;
@@ -124,6 +124,7 @@ public abstract class AbstractServletInp
 
     @Override
     public void close() throws IOException {
+        closeRequired = true;
         doClose();
     }
 
@@ -176,6 +177,11 @@ public abstract class AbstractServletInp
 
     protected abstract boolean doIsReady() throws IOException;
 
+    /**
+     * Abstract method to be overridden by concrete implementations. The base
+     * class will ensure that there are no concurrent calls to this method for
+     * the same socket.
+     */
     protected abstract int doRead(boolean block, byte[] b, int off, int len)
             throws IOException;
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java?rev=1433175&r1=1433174&r2=1433175&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java Mon Jan 14 22:24:09 2013
@@ -31,6 +31,7 @@ public abstract class AbstractServletOut
     private final Object fireListenerLock = new Object();
     private final Object writeLock = new Object();
 
+    private volatile boolean closeRequired = false;
     // Start in blocking-mode
     private volatile WriteListener listener = null;
     private volatile boolean fireListener = false;
@@ -61,6 +62,10 @@ public abstract class AbstractServletOut
         this.listener = listener;
     }
 
+    protected final boolean isCloseRequired() {
+        return closeRequired;
+    }
+
     @Override
     public void write(int b) throws IOException {
         preWriteChecks();
@@ -79,6 +84,7 @@ public abstract class AbstractServletOut
 
     @Override
     public void close() throws IOException {
+        closeRequired = true;
         doClose();
     }
 
@@ -132,6 +138,11 @@ public abstract class AbstractServletOut
         }
     }
 
+    /**
+     * Abstract method to be overridden by concrete implementations. The base
+     * class will ensure that there are no concurrent calls to this method for
+     * the same socket.
+     */
     protected abstract int doWrite(boolean block, byte[] b, int off, int len)
             throws IOException;
 

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletInputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletInputStream.java?rev=1433175&r1=1433174&r2=1433175&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletInputStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletInputStream.java Mon Jan 14 22:24:09 2013
@@ -17,69 +17,93 @@
 package org.apache.coyote.http11.upgrade;
 
 import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.tomcat.jni.Socket;
+import org.apache.tomcat.jni.Status;
 import org.apache.tomcat.util.net.SocketWrapper;
 
 public class AprServletInputStream extends AbstractServletInputStream {
 
+    private final SocketWrapper<Long> wrapper;
     private final long socket;
+    private final Lock blockingStatusReadLock;
+    private final WriteLock blockingStatusWriteLock;
+    private volatile boolean eagain = false;
 
 
     public AprServletInputStream(SocketWrapper<Long> wrapper) {
+        this.wrapper = wrapper;
         this.socket = wrapper.getSocket().longValue();
+        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+        this.blockingStatusReadLock = lock.readLock();
+        this.blockingStatusWriteLock =lock.writeLock();
     }
-/*
-    @Override
-    protected int doRead() throws IOException {
-        byte[] bytes = new byte[1];
-        int result = Socket.recv(socket, bytes, 0, 1);
-        if (result == -1) {
-            return -1;
-        } else {
-            return bytes[0] & 0xFF;
-        }
-    }
+
 
     @Override
-    protected int doRead(byte[] b, int off, int len) throws IOException {
-        boolean block = true;
-        if (!block) {
-            Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1);
-        }
+    protected int doRead(boolean block, byte[] b, int off, int len)
+            throws IOException {
+
+        boolean readDone = false;
+        int result = 0;
         try {
-            int result = Socket.recv(socket, b, off, len);
-            if (result > 0) {
-                return result;
-            } else if (-result == Status.EAGAIN) {
-                return 0;
-            } else {
-                throw new IOException(sm.getString("apr.error",
-                        Integer.valueOf(-result)));
+            blockingStatusReadLock.lock();
+            if (wrapper.getBlockingStatus() == block) {
+                result = Socket.recv(socket, b, off, len);
+                readDone = true;
             }
         } finally {
-            if (!block) {
-                Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
+            blockingStatusReadLock.unlock();
+        }
+
+        if (!readDone) {
+            try {
+                blockingStatusWriteLock.lock();
+                wrapper.setBlockingStatus(block);
+                // Set the current settings for this socket
+                Socket.optSet(socket, Socket.APR_SO_NONBLOCK, (block ? 0 : 1));
+                // Downgrade the lock
+                try {
+                    blockingStatusReadLock.lock();
+                    blockingStatusWriteLock.unlock();
+                    result = Socket.recv(socket, b, off, len);
+                } finally {
+                    blockingStatusReadLock.unlock();
+                }
+            } finally {
+                // Should have been released above but may not have been on some
+                // exception paths
+                if (blockingStatusWriteLock.isHeldByCurrentThread()) {
+                    blockingStatusWriteLock.unlock();
+                }
             }
         }
-    }
-}
-*/
 
-    @Override
-    protected int doRead(boolean block, byte[] b, int off, int len)
-            throws IOException {
-        // TODO Auto-generated method stub
-        return 0;
+        if (result > 0) {
+            eagain = false;
+            return result;
+        } else if (-result == Status.EAGAIN) {
+            eagain = true;
+            return 0;
+        } else {
+            throw new IOException(sm.getString("apr.read.error",
+                    Integer.valueOf(-result)));
+        }
     }
 
+
     @Override
     protected boolean doIsReady() {
-        // TODO Auto-generated method stub
-        return false;
+        return !eagain;
     }
 
+
     @Override
     protected void doClose() throws IOException {
-        // TODO Auto-generated method stub
+        // NO-OP
+        // Let AbstractProcessor trigger the close
     }
 }

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java?rev=1433175&r1=1433174&r2=1433175&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java Mon Jan 14 22:24:09 2013
@@ -17,24 +17,74 @@
 package org.apache.coyote.http11.upgrade;
 
 import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.tomcat.jni.Socket;
 import org.apache.tomcat.util.net.SocketWrapper;
 
 public class AprServletOutputStream extends AbstractServletOutputStream {
 
+    private final SocketWrapper<Long> wrapper;
     private final long socket;
-
+    private final Lock blockingStatusReadLock;
+    private final WriteLock blockingStatusWriteLock;
 
     public AprServletOutputStream(SocketWrapper<Long> wrapper) {
+        this.wrapper = wrapper;
         this.socket = wrapper.getSocket().longValue();
+        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+        this.blockingStatusReadLock = lock.readLock();
+        this.blockingStatusWriteLock =lock.writeLock();
     }
 
 
     @Override
     protected int doWrite(boolean block, byte[] b, int off, int len)
             throws IOException {
-        // TODO Auto-generated method stub
-        return 0;
+
+        boolean writeDone = false;
+        int result = 0;
+        try {
+            blockingStatusReadLock.lock();
+            if (wrapper.getBlockingStatus() == block) {
+                result = Socket.send(socket, b, off, len);
+                writeDone = true;
+            }
+        } finally {
+            blockingStatusReadLock.unlock();
+        }
+
+        if (!writeDone) {
+            try {
+                blockingStatusWriteLock.lock();
+                wrapper.setBlockingStatus(block);
+                // Set the current settings for this socket
+                Socket.optSet(socket, Socket.APR_SO_NONBLOCK, (block ? -1 : 0));
+                // Downgrade the lock
+                try {
+                    blockingStatusReadLock.lock();
+                    blockingStatusWriteLock.unlock();
+                    result = Socket.send(socket, b, off, len);
+                } finally {
+                    blockingStatusReadLock.unlock();
+                }
+            } finally {
+                // Should have been released above but may not have been on some
+                // exception paths
+                if (blockingStatusWriteLock.isHeldByCurrentThread()) {
+                    blockingStatusWriteLock.unlock();
+                }
+            }
+        }
+
+        if (result < 0) {
+            throw new IOException(sm.getString("apr.write.error",
+                    Integer.valueOf(-result)));
+        }
+
+        return result;
     }
 
     @Override
@@ -45,6 +95,7 @@ public class AprServletOutputStream exte
 
     @Override
     protected void doClose() throws IOException {
-        // TODO Auto-generated method stub
+        // NO-OP
+        // Let AbstractProcessor trigger the close
     }
 }

Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties?rev=1433175&r1=1433174&r2=1433175&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties Mon Jan 14 22:24:09 2013
@@ -21,7 +21,8 @@ upgrade.sos.canWrite.ise=It is illegal t
 upgrade.sos.writeListener.null=It is illegal to pass null to setWriteListener()
 upgrade.sis.write.ise=It is illegal to call any of the write() methods in non-blocking mode without first checking that there is space available by calling canWrite()
 
-apr.error=Unexpected error [{0}] reading data from the APR/native socket.
+apr.read.error=Unexpected error [{0}] reading data from the APR/native socket.
+apr.write.error=Unexpected error [{0}] writing data to the APR/native socket.
 
 nio.eof.error=Unexpected EOF read on the socket
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java?rev=1433175&r1=1433174&r2=1433175&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java Mon Jan 14 22:24:09 2013
@@ -37,7 +37,11 @@ public class SocketWrapper<E> {
     private int remotePort = -1;
     private String remoteHost = null;
     private String remoteAddr = null;
-
+    /*
+     * Used if block/non-blocking is set at the socket level. The client is
+     * responsible for the thread-safe use of this field.
+     */
+    private volatile boolean blockingStatus = true;
 
     public SocketWrapper(E socket) {
         this.socket = socket;
@@ -74,4 +78,8 @@ public class SocketWrapper<E> {
     public void setRemoteHost(String remoteHost) {this.remoteHost = remoteHost; }
     public String getRemoteAddr() { return remoteAddr; }
     public void setRemoteAddr(String remoteAddr) {this.remoteAddr = remoteAddr; }
+    public boolean getBlockingStatus() { return blockingStatus; }
+    public void setBlockingStatus(boolean blockingStatus) {
+        this.blockingStatus = blockingStatus;
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org