You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2014/03/10 12:27:12 UTC

svn commit: r1575905 [2/4] - in /tomcat/trunk: ./ java/org/apache/coyote/ajp/ java/org/apache/coyote/http11/ java/org/apache/coyote/http11/upgrade/ java/org/apache/tomcat/util/net/ webapps/docs/ webapps/docs/config/

Added: tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java (added)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,487 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.coyote.http11;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.servlet.RequestDispatcher;
+
+import org.apache.coyote.OutputBuffer;
+import org.apache.coyote.Response;
+import org.apache.tomcat.util.buf.ByteChunk;
+import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.Nio2Channel;
+import org.apache.tomcat.util.net.Nio2Endpoint;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+/**
+ * Output buffer implementation for NIO2.
+ */
+public class InternalNio2OutputBuffer extends AbstractOutputBuffer<Nio2Channel> {
+
+    // ----------------------------------------------------------- Constructors
+
+    /**
+     * Default constructor.
+     */
+    public InternalNio2OutputBuffer(Response response, int headerBufferSize) {
+
+        super(response, headerBufferSize);
+
+        outputStreamOutputBuffer = new SocketOutputBuffer();
+    }
+
+    private static final ByteBuffer[] EMPTY_BUF_ARRAY = new ByteBuffer[0];
+
+    /**
+     * Underlying socket.
+     */
+    private SocketWrapper<Nio2Channel> socket;
+
+    /**
+     * Track write interest
+     */
+    protected volatile boolean interest = false;
+
+    /**
+     * Track if the byte buffer is flipped
+     */
+    protected volatile boolean flipped = false;
+
+    /**
+     * The completion handler used for asynchronous write operations
+     */
+    protected CompletionHandler<Integer, SocketWrapper<Nio2Channel>> completionHandler;
+
+    /**
+     * The completion handler used for asynchronous write operations
+     */
+    protected CompletionHandler<Long, ByteBuffer[]> gatherCompletionHandler;
+
+    /**
+     * Write pending flag.
+     */
+    protected Semaphore writePending = new Semaphore(1);
+
+    /**
+     * The associated endpoint.
+     */
+    protected AbstractEndpoint<Nio2Channel> endpoint = null;
+
+    /**
+     * Used instead of the deque since it looks equivalent and simpler.
+     */
+    protected ArrayList<ByteBuffer> bufferedWrites = new ArrayList<>();
+
+    /**
+     * Exception that occurred during writing.
+     */
+    protected IOException e = null;
+
+    // --------------------------------------------------------- Public Methods
+
+    @Override
+    public void init(SocketWrapper<Nio2Channel> socketWrapper,
+            AbstractEndpoint<Nio2Channel> associatedEndpoint) throws IOException {
+        this.socket = socketWrapper;
+        this.endpoint = associatedEndpoint;
+
+        this.completionHandler = new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
+            @Override
+            public void completed(Integer nBytes, SocketWrapper<Nio2Channel> attachment) {
+                boolean notify = false;
+                synchronized (completionHandler) {
+                    if (nBytes < 0) {
+                        failed(new IOException(sm.getString("iob.failedwrite")), attachment);
+                        return;
+                    }
+                    if (bufferedWrites.size() > 0) {
+                        // Continue writing data
+                        ArrayList<ByteBuffer> arrayList = new ArrayList<>();
+                        for (ByteBuffer buffer : bufferedWrites) {
+                            buffer.flip();
+                            arrayList.add(buffer);
+                        }
+                        bufferedWrites.clear();
+                        ByteBuffer[] array = arrayList.toArray(EMPTY_BUF_ARRAY);
+                        socket.getSocket().write(array, 0, array.length,
+                                attachment.getTimeout(), TimeUnit.MILLISECONDS,
+                                array, gatherCompletionHandler);
+                    } else {
+                        if (interest && !Nio2Endpoint.isInline()) {
+                            interest = false;
+                            notify = true;
+                        }
+                        writePending.release();
+                    }
+                }
+                if (notify) {
+                    endpoint.processSocket(attachment, SocketStatus.OPEN_WRITE, true);
+                }
+            }
+
+            @Override
+            public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
+                attachment.setError(true);
+                if (exc instanceof IOException) {
+                    e = (IOException) exc;
+                } else {
+                    e = new IOException(exc);
+                }
+                response.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION, e);
+                writePending.release();
+                endpoint.processSocket(attachment, SocketStatus.OPEN_WRITE, true);
+            }
+        };
+        this.gatherCompletionHandler = new CompletionHandler<Long, ByteBuffer[]>() {
+            @Override
+            public void completed(Long nBytes, ByteBuffer[] attachment) {
+                boolean notify = false;
+                synchronized (completionHandler) {
+                    if (nBytes < 0) {
+                        failed(new IOException(sm.getString("iob.failedwrite")), attachment);
+                        return;
+                    }
+                    if (bufferedWrites.size() > 0 || arrayHasData(attachment)) {
+                        // Continue writing data
+                        ArrayList<ByteBuffer> arrayList = new ArrayList<>();
+                        for (ByteBuffer buffer : attachment) {
+                            if (buffer.hasRemaining()) {
+                                arrayList.add(buffer);
+                            }
+                        }
+                        for (ByteBuffer buffer : bufferedWrites) {
+                            buffer.flip();
+                            arrayList.add(buffer);
+                        }
+                        bufferedWrites.clear();
+                        ByteBuffer[] array = arrayList.toArray(EMPTY_BUF_ARRAY);
+                        socket.getSocket().write(array, 0, array.length,
+                                socket.getTimeout(), TimeUnit.MILLISECONDS,
+                                array, gatherCompletionHandler);
+                    } else {
+                        if (interest && !Nio2Endpoint.isInline()) {
+                            interest = false;
+                            notify = true;
+                        }
+                        writePending.release();
+                    }
+                }
+                if (notify) {
+                    endpoint.processSocket(socket, SocketStatus.OPEN_WRITE, true);
+                }
+            }
+
+            @Override
+            public void failed(Throwable exc, ByteBuffer[] attachment) {
+                socket.setError(true);
+                if (exc instanceof IOException) {
+                    e = (IOException) exc;
+                } else {
+                    e = new IOException(exc);
+                }
+                response.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION, e);
+                writePending.release();
+                endpoint.processSocket(socket, SocketStatus.OPEN_WRITE, true);
+           }
+        };
+    }
+
+
+    /**
+     * Recycle the output buffer. This should be called when closing the
+     * connection.
+     */
+    @Override
+    public void recycle() {
+        super.recycle();
+        socket = null;
+        e = null;
+        flipped = false;
+        interest = false;
+        if (writePending.availablePermits() != 1) {
+            writePending.drainPermits();
+            writePending.release();
+        }
+        bufferedWrites.clear();
+    }
+
+
+    @Override
+    public void nextRequest() {
+        super.nextRequest();
+        flipped = false;
+        interest = false;
+    }
+
+    // ------------------------------------------------ HTTP/1.1 Output Methods
+
+    /**
+     * Send an acknowledgment.
+     */
+    @Override
+    public void sendAck() throws IOException {
+        if (!committed) {
+            addToBB(Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length);
+        }
+    }
+
+    // ------------------------------------------------------ Protected Methods
+
+    /**
+     * Commit the response.
+     *
+     * @throws IOException an underlying I/O error occurred
+     */
+    @Override
+    protected void commit() throws IOException {
+
+        // The response is now committed
+        committed = true;
+        response.setCommitted(true);
+
+        if (pos > 0) {
+            // Sending the response header buffer
+            addToBB(headerBuffer, 0, pos);
+        }
+
+    }
+
+    private static boolean arrayHasData(ByteBuffer[] byteBuffers) {
+        for (ByteBuffer byteBuffer : byteBuffers) {
+            if (byteBuffer.hasRemaining()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void addToBB(byte[] buf, int offset, int length)
+            throws IOException {
+
+        if (length == 0)
+            return;
+        if (socket == null || socket.getSocket() == null)
+            return;
+
+        ByteBuffer writeByteBuffer = socket.getSocket().getBufHandler().getWriteBuffer();
+
+        socket.access();
+
+        if (isBlocking()) {
+            while (length > 0) {
+                int thisTime = transfer(buf, offset, length, writeByteBuffer);
+                length = length - thisTime;
+                offset = offset + thisTime;
+                if (writeByteBuffer.remaining() == 0) {
+                    flushBuffer(true);
+                }
+            }
+        } else {
+            // FIXME: Possible new behavior:
+            // If there's non blocking abuse (like a test writing 1MB in a single
+            // "non blocking" write), then block until the previous write is
+            // done rather than continue buffering
+            // Also allows doing autoblocking
+            // Could be "smart" with coordination with the main CoyoteOutputStream to
+            // indicate the end of a write
+            // Uses: if (writePending.tryAcquire(socket.getTimeout(), TimeUnit.MILLISECONDS))
+            if (writePending.tryAcquire()) {
+                synchronized (completionHandler) {
+                    // No pending completion handler, so writing to the main buffer
+                    // is possible
+                    int thisTime = transfer(buf, offset, length, writeByteBuffer);
+                    length = length - thisTime;
+                    offset = offset + thisTime;
+                    if (length > 0) {
+                        // Remaining data must be buffered
+                        addToBuffers(buf, offset, length);
+                    }
+                    flushBufferInternal(false, true);
+                }
+            } else {
+                synchronized (completionHandler) {
+                    addToBuffers(buf, offset, length);
+                }
+            }
+        }
+    }
+
+
+    private void addToBuffers(byte[] buf, int offset, int length) {
+        ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize, length));
+        buffer.put(buf, offset, length);
+        bufferedWrites.add(buffer);
+    }
+
+
+    private int transfer(byte[] from, int offset, int length, ByteBuffer to) {
+        int max = Math.min(length, to.remaining());
+        to.put(from, offset, max);
+        return max;
+    }
+
+
+    /**
+     * Callback to write data from the buffer.
+     */
+    @Override
+    protected boolean flushBuffer(boolean block) throws IOException {
+        if (e != null) {
+            throw e;
+        }
+        return flushBufferInternal(block, false);
+    }
+
+    private boolean flushBufferInternal(boolean block, boolean hasPermit) throws IOException {
+        if (socket == null || socket.getSocket() == null)
+            return false;
+
+        ByteBuffer byteBuffer = socket.getSocket().getBufHandler().getWriteBuffer();
+        if (block) {
+            if (!isBlocking()) {
+                // The final flush is blocking, but the processing was using
+                // non blocking so wait until an async write is done
+                try {
+                    if (writePending.tryAcquire(socket.getTimeout(), TimeUnit.MILLISECONDS)) {
+                        writePending.release();
+                    }
+                } catch (InterruptedException e) {
+                    // Ignore timeout
+                }
+            }
+            if (hasMoreDataToFlush()) {
+                try {
+                    if (!flipped) {
+                        byteBuffer.flip();
+                        flipped = true;
+                    }
+                    socket.getSocket().write(byteBuffer).get(socket.getTimeout(), TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    throw new IOException(e);
+                } catch (ExecutionException e) {
+                    throw new IOException(e);
+                } catch (TimeoutException e) {
+                    throw new SocketTimeoutException();
+                }
+                if (byteBuffer.remaining() == 0) {
+                    //blocking writes must empty the buffer
+                    //and if remaining==0 then we did empty it
+                    byteBuffer.clear();
+                    flipped = false;
+                }
+            } else {
+                byteBuffer.clear();
+                flipped = false;
+            }
+        } else {
+            synchronized (completionHandler) {
+                if (hasPermit || writePending.tryAcquire()) {
+                    //prevent timeout for async
+                    socket.access();
+                    if (!flipped) {
+                        byteBuffer.flip();
+                        flipped = true;
+                    }
+                    Nio2Endpoint.startInline();
+                    if (bufferedWrites.size() > 0) {
+                        // Gathering write of the main buffer plus all leftovers
+                        ArrayList<ByteBuffer> arrayList = new ArrayList<ByteBuffer>();
+                        if (byteBuffer.hasRemaining()) {
+                            arrayList.add(byteBuffer);
+                        }
+                        for (ByteBuffer buffer : bufferedWrites) {
+                            buffer.flip();
+                            arrayList.add(buffer);
+                        }
+                        bufferedWrites.clear();
+                        ByteBuffer[] array = arrayList.toArray(EMPTY_BUF_ARRAY);
+                        socket.getSocket().write(array, 0, array.length, socket.getTimeout(),
+                                TimeUnit.MILLISECONDS, array, gatherCompletionHandler);
+                    } else if (byteBuffer.hasRemaining()) {
+                        // Regular write
+                        socket.getSocket().write(byteBuffer, socket.getTimeout(),
+                                TimeUnit.MILLISECONDS, socket, completionHandler);
+                    } else {
+                        // Nothing was written
+                        writePending.release();
+                    }
+                    Nio2Endpoint.endInline();
+                    if (writePending.availablePermits() > 0) {
+                        if (byteBuffer.remaining() == 0) {
+                            byteBuffer.clear();
+                            flipped = false;
+                        }
+                    }
+                }
+            }
+        }
+        return hasMoreDataToFlush();
+    }
+
+
+    @Override
+    protected boolean hasMoreDataToFlush() {
+        return (flipped && socket.getSocket().getBufHandler().getWriteBuffer().remaining() > 0) ||
+                (!flipped && socket.getSocket().getBufHandler().getWriteBuffer().position() > 0) ||
+                (writePending.availablePermits() == 0) || bufferedWrites.size() > 0 || e != null;
+    }
+
+
+    @Override
+    protected void registerWriteInterest() throws IOException {
+        interest = true;
+    }
+
+
+    // ----------------------------------- OutputStreamOutputBuffer Inner Class
+
+    /**
+     * This class is an output buffer which will write data to an output
+     * stream.
+     */
+    protected class SocketOutputBuffer implements OutputBuffer {
+
+        /**
+         * Write chunk.
+         */
+        @Override
+        public int doWrite(ByteChunk chunk, Response res) throws IOException {
+            int len = chunk.getLength();
+            int start = chunk.getStart();
+            byte[] b = chunk.getBuffer();
+            addToBB(b, start, len);
+            byteCount += len;
+            return len;
+        }
+
+        @Override
+        public long getBytesWritten() {
+            return byteCount;
+        }
+    }
+}

Modified: tomcat/trunk/java/org/apache/coyote/http11/LocalStrings.properties
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/LocalStrings.properties?rev=1575905&r1=1575904&r2=1575905&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/LocalStrings.properties (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/LocalStrings.properties Mon Mar 10 11:27:11 2014
@@ -38,6 +38,7 @@ iib.readtimeout=Timeout attempting to re
 iib.requestheadertoolarge.error=Request header is too large
 iib.socketClosed=The socket has been closed in another thread
 
+iob.failedwrite=Failed write
 iob.failedwrite.ack=Failed to send HTTP 100 continue response
 iob.illegalreset=The response may not be reset once it has been committed
 iob.responseheadertoolarge.error=An attempt was made to write more data to the response headers than there was room available in the buffer. Increase maxHttpHeaderSize on the connector or write less data into the response headers.

Added: tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2Processor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2Processor.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2Processor.java (added)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2Processor.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,42 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11.upgrade;
+
+import javax.servlet.http.HttpUpgradeHandler;
+
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.net.Nio2Channel;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+public class Nio2Processor extends AbstractProcessor<Nio2Channel> {
+
+    private static final Log log = LogFactory.getLog(Nio2Processor.class);
+    @Override
+    protected Log getLog() {return log;}
+
+    private static final int INFINITE_TIMEOUT = -1;
+
+    public Nio2Processor(SocketWrapper<Nio2Channel> wrapper,
+            HttpUpgradeHandler httpUpgradeProcessor) {
+        super(httpUpgradeProcessor,
+                new Nio2ServletInputStream(wrapper),
+                new Nio2ServletOutputStream(wrapper));
+
+        wrapper.setTimeout(INFINITE_TIMEOUT);
+    }
+}

Added: tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java (added)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,229 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11.upgrade;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.CompletionHandler;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.tomcat.util.net.Nio2Channel;
+import org.apache.tomcat.util.net.Nio2Endpoint;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+public class Nio2ServletInputStream extends AbstractServletInputStream {
+
+    private final SocketWrapper<Nio2Channel> wrapper;
+    private final Nio2Channel channel;
+    private final CompletionHandler<Integer, SocketWrapper<Nio2Channel>> completionHandler;
+    private boolean flipped = false;
+    private volatile boolean readPending = false;
+
+    public Nio2ServletInputStream(SocketWrapper<Nio2Channel> wrapper) {
+        this.wrapper = wrapper;
+        this.channel = wrapper.getSocket();
+        this.completionHandler = new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
+            @Override
+            public void completed(Integer nBytes, SocketWrapper<Nio2Channel> attachment) {
+                synchronized (completionHandler) {
+                    if (nBytes < 0) {
+                        failed(new ClosedChannelException(), attachment);
+                        return;
+                    }
+                    readPending = false;
+                }
+                if (nBytes > 0) {
+                    if (!Nio2Endpoint.isInline()) {
+                        try {
+                            onDataAvailable();
+                        } catch (IOException e) {
+                            failed(e, attachment);
+                        }
+                    }
+                } else {
+                    try {
+                        onAllDataRead();
+                    } catch (IOException e) {
+                        failed(e, attachment);
+                    }
+                }
+            }
+            @Override
+            public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
+                attachment.setError(true);
+                readPending = false;
+                onError(exc);
+                try {
+                    close();
+                } catch (IOException e) {
+                    // Ignore
+                }
+            }
+        };
+    }
+
+    @Override
+    protected boolean doIsReady() throws IOException {
+        synchronized (completionHandler) {
+            if (readPending) {
+                return false;
+            }
+            ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
+            if (!flipped) {
+                readBuffer.flip();
+                flipped = true;
+            }
+            if (readBuffer.remaining() > 0) {
+                return true;
+            }
+
+            readBuffer.clear();
+            flipped = false;
+            int nRead = fillReadBuffer(false);
+
+            boolean isReady = nRead > 0;
+            if (isReady) {
+                if (!flipped) {
+                    readBuffer.flip();
+                    flipped = true;
+                }
+                try {
+                    onDataAvailable();
+                } catch (IOException e) {
+                    onError(e);
+                    try {
+                        close();
+                    } catch (IOException ioe) {
+                        // Ignore
+                    }
+                }
+            }
+            return isReady;
+        }
+    }
+
+    @Override
+    protected int doRead(boolean block, byte[] b, int off, int len)
+            throws IOException {
+
+        synchronized (completionHandler) {
+            if (readPending) {
+                return 0;
+            }
+
+            ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
+
+            if (!flipped) {
+                readBuffer.flip();
+                flipped = true;
+            }
+            int remaining = readBuffer.remaining();
+            // Is there enough data in the read buffer to satisfy this request?
+            if (remaining >= len) {
+                readBuffer.get(b, off, len);
+                return len;
+            }
+
+            // Copy what data there is in the read buffer to the byte array
+            int leftToWrite = len;
+            int newOffset = off;
+            if (remaining > 0) {
+                readBuffer.get(b, off, remaining);
+                leftToWrite -= remaining;
+                newOffset += remaining;
+            }
+
+            // Fill the read buffer as best we can
+            readBuffer.clear();
+            flipped = false;
+            int nRead = fillReadBuffer(block);
+
+            // Full as much of the remaining byte array as possible with the data
+            // that was just read
+            if (nRead > 0) {
+                if (!flipped) {
+                    readBuffer.flip();
+                    flipped = true;
+                }
+                if (nRead > leftToWrite) {
+                    readBuffer.get(b, newOffset, leftToWrite);
+                    leftToWrite = 0;
+                } else {
+                    readBuffer.get(b, newOffset, nRead);
+                    leftToWrite -= nRead;
+                }
+            } else if (nRead == 0) {
+                if (block) {
+                    if (!flipped) {
+                        readBuffer.flip();
+                        flipped = true;
+                    }
+                }
+            } else if (nRead == -1) {
+                // TODO i18n
+                throw new EOFException();
+            }
+
+            return len - leftToWrite;
+        }
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        try {
+            channel.close();
+        } catch (AsynchronousCloseException e) {
+            // Ignore
+        }
+    }
+
+    private int fillReadBuffer(boolean block) throws IOException {
+        ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
+        int nRead = 0;
+        if (block) {
+            readPending = true;
+            readBuffer.clear();
+            flipped = false;
+            try {
+                nRead = channel.read(readBuffer)
+                        .get(wrapper.getTimeout(), TimeUnit.MILLISECONDS);
+                readPending = false;
+            } catch (InterruptedException | ExecutionException
+                    | TimeoutException e) {
+                onError(e);
+                throw new IOException(e);
+            }
+        } else {
+            readPending = true;
+            readBuffer.clear();
+            flipped = false;
+            Nio2Endpoint.startInline();
+            channel.read(readBuffer,
+                    wrapper.getTimeout(), TimeUnit.MILLISECONDS, wrapper, completionHandler);
+            Nio2Endpoint.endInline();
+            if (!readPending) {
+                nRead = readBuffer.position();
+            }
+        }
+        return nRead;
+    }
+}

Added: tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java (added)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,169 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11.upgrade;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.CompletionHandler;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.tomcat.util.net.Nio2Channel;
+import org.apache.tomcat.util.net.Nio2Endpoint;
+import org.apache.tomcat.util.net.Nio2Endpoint.Nio2SocketWrapper;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+public class Nio2ServletOutputStream extends AbstractServletOutputStream<Nio2Channel> {
+
+    private final Nio2Channel channel;
+    private final int maxWrite;
+    private final CompletionHandler<Integer, SocketWrapper<Nio2Channel>> completionHandler;
+    private volatile boolean writePending = false;
+
+    public Nio2ServletOutputStream(SocketWrapper<Nio2Channel> socketWrapper) {
+        super(socketWrapper);
+        channel = socketWrapper.getSocket();
+        maxWrite = channel.getBufHandler().getWriteBuffer().capacity();
+        this.completionHandler = new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
+            @Override
+            public void completed(Integer nBytes, SocketWrapper<Nio2Channel> attachment) {
+                synchronized (completionHandler) {
+                    if (nBytes < 0) {
+                        failed(new ClosedChannelException(), attachment);
+                        return;
+                    }
+                    writePending = false;
+                }
+                if (!Nio2Endpoint.isInline()) {
+                    try {
+                        onWritePossible();
+                    } catch (IOException e) {
+                        failed(e, attachment);
+                    }
+                }
+            }
+            @Override
+            public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
+                attachment.setError(true);
+                writePending = false;
+                onError(exc);
+                try {
+                    close();
+                } catch (IOException e) {
+                    // Ignore
+                }
+            }
+        };
+    }
+
+    @Override
+    protected int doWrite(boolean block, byte[] b, int off, int len)
+            throws IOException {
+        int leftToWrite = len;
+        int count = 0;
+        int offset = off;
+
+        while (leftToWrite > 0) {
+            int writeThisLoop;
+            int writtenThisLoop;
+
+            if (leftToWrite > maxWrite) {
+                writeThisLoop = maxWrite;
+            } else {
+                writeThisLoop = leftToWrite;
+            }
+
+            writtenThisLoop = doWriteInternal(block, b, offset, writeThisLoop);
+            if (writtenThisLoop < 0) {
+                throw new EOFException();
+            }
+            count += writtenThisLoop;
+            if (!block && writePending) {
+                // Prevent concurrent writes in non blocking mode,
+                // leftover data has to be buffered
+                return count;
+            }
+            offset += writtenThisLoop;
+            leftToWrite -= writtenThisLoop;
+
+            if (writtenThisLoop < writeThisLoop) {
+                break;
+            }
+        }
+
+        return count;
+    }
+
+    private int doWriteInternal(boolean block, byte[] b, int off, int len)
+            throws IOException {
+        ByteBuffer buffer = channel.getBufHandler().getWriteBuffer();
+        int written = 0;
+        long writeTimeout = ((Nio2SocketWrapper) socketWrapper).getWriteTimeout();
+        if (block) {
+            buffer.clear();
+            buffer.put(b, off, len);
+            buffer.flip();
+            try {
+                written = channel.write(buffer).get(writeTimeout, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException | ExecutionException
+                    | TimeoutException e) {
+                onError(e);
+                throw new IOException(e);
+            }
+        } else {
+            synchronized (completionHandler) {
+                if (!writePending) {
+                    buffer.clear();
+                    buffer.put(b, off, len);
+                    buffer.flip();
+                    writePending = true;
+                    Nio2Endpoint.startInline();
+                    channel.write(buffer, writeTimeout, TimeUnit.MILLISECONDS, socketWrapper, completionHandler);
+                    Nio2Endpoint.endInline();
+                    written = len;
+                }
+            }
+        }
+        return written;
+    }
+
+    @Override
+    protected void doFlush() throws IOException {
+        long writeTimeout = ((Nio2SocketWrapper) socketWrapper).getWriteTimeout();
+        try {
+            if (!writePending) {
+                channel.flush().get(writeTimeout, TimeUnit.MILLISECONDS);
+            }
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            onError(e);
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        try {
+            channel.close();
+        } catch (AsynchronousCloseException e) {
+            // Ignore
+        }
+    }
+}

Added: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java?rev=1575905&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java Mon Mar 10 11:27:11 2014
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousByteChannel;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.tomcat.util.net.SecureNio2Channel.ApplicationBufferHandler;
+
+/**
+ * Base class for a SocketChannel wrapper used by the endpoint.
+ * This way, logic for a SSL socket channel remains the same as for
+ * a non SSL, making sure we don't need to code for any exception cases.
+ */
+public class Nio2Channel implements AsynchronousByteChannel {
+
+    protected static ByteBuffer emptyBuf = ByteBuffer.allocate(0);
+
+    protected AsynchronousSocketChannel sc = null;
+
+    protected ApplicationBufferHandler bufHandler;
+
+    protected boolean sendFile = false;
+
+    public Nio2Channel(AsynchronousSocketChannel channel, ApplicationBufferHandler bufHandler) {
+        this.sc = channel;
+        this.bufHandler = bufHandler;
+    }
+
+    /**
+     * Reset the channel
+     *
+     * @throws IOException If a problem was encountered resetting the channel
+     */
+    public void reset() throws IOException {
+        bufHandler.getReadBuffer().clear();
+        bufHandler.getWriteBuffer().clear();
+        this.sendFile = false;
+    }
+
+    public int getBufferSize() {
+        if ( bufHandler == null ) return 0;
+        int size = 0;
+        size += bufHandler.getReadBuffer()!=null?bufHandler.getReadBuffer().capacity():0;
+        size += bufHandler.getWriteBuffer()!=null?bufHandler.getWriteBuffer().capacity():0;
+        return size;
+    }
+
+    /**
+     * Closes this channel.
+     *
+     * @throws IOException If an I/O error occurs
+     */
+    @Override
+    public void close() throws IOException {
+        getIOChannel().close();
+    }
+
+    public void close(boolean force) throws IOException {
+        if (isOpen() || force ) close();
+    }
+    /**
+     * Tells whether or not this channel is open.
+     *
+     * @return <tt>true</tt> if, and only if, this channel is open
+     */
+    @Override
+    public boolean isOpen() {
+        return sc.isOpen();
+    }
+
+    public ApplicationBufferHandler getBufHandler() {
+        return bufHandler;
+    }
+
+    public AsynchronousSocketChannel getIOChannel() {
+        return sc;
+    }
+
+    public boolean isClosing() {
+        return false;
+    }
+
+    public boolean isHandshakeComplete() {
+        return true;
+    }
+
+    /**
+     * Performs SSL handshake hence is a no-op for the non-secure
+     * implementation.
+     *
+     * @return Always returns zero
+     * @throws IOException
+     */
+    public int handshake() throws IOException {
+        return 0;
+    }
+
+    public void setIOChannel(AsynchronousSocketChannel IOChannel) {
+        this.sc = IOChannel;
+    }
+
+    @Override
+    public String toString() {
+        return super.toString()+":"+this.sc.toString();
+    }
+
+    public boolean isSendFile() {
+        return sendFile;
+    }
+
+    public void setSendFile(boolean s) {
+        this.sendFile = s;
+    }
+
+    @Override
+    public Future<Integer> read(ByteBuffer dst) {
+        return sc.read(dst);
+    }
+
+    @Override
+    public <A> void read(ByteBuffer dst, A attachment,
+            CompletionHandler<Integer, ? super A> handler) {
+        read(dst, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, attachment, handler);
+    }
+
+    public <A> void read(ByteBuffer dst,
+            long timeout, TimeUnit unit, A attachment,
+            CompletionHandler<Integer, ? super A> handler) {
+        sc.read(dst, timeout, unit, attachment, handler);
+    }
+
+    @Override
+    public Future<Integer> write(ByteBuffer src) {
+        return sc.write(src);
+    }
+
+    @Override
+    public <A> void write(ByteBuffer src, A attachment,
+            CompletionHandler<Integer, ? super A> handler) {
+        write(src, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, attachment, handler);
+    }
+
+    public <A> void write(ByteBuffer src, long timeout, TimeUnit unit, A attachment,
+            CompletionHandler<Integer, ? super A> handler) {
+        sc.write(src, timeout, unit, attachment, handler);
+    }
+
+    public <A> void write(ByteBuffer[] srcs, int offset, int length,
+            long timeout, TimeUnit unit, A attachment,
+            CompletionHandler<Long,? super A> handler) {
+        sc.write(srcs, offset, length, timeout, unit, attachment, handler);
+    }
+
+    private static final Future<Boolean> DONE = new Future<Boolean>() {
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return false;
+        }
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+        @Override
+        public boolean isDone() {
+            return true;
+        }
+        @Override
+        public Boolean get() throws InterruptedException,
+                ExecutionException {
+            return Boolean.TRUE;
+        }
+        @Override
+        public Boolean get(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException,
+                TimeoutException {
+            return Boolean.TRUE;
+        }
+    };
+
+    public Future<Boolean> flush()
+            throws IOException {
+        return DONE;
+    }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org