You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2015/05/20 21:21:52 UTC

svn commit: r1680661 - in /tomcat/trunk/java/org/apache/coyote/http2: AbstractStream.java Http2UpgradeHandler.java LocalStrings.properties Stream.java WriteStateMachine.java

Author: markt
Date: Wed May 20 19:21:52 2015
New Revision: 1680661

URL: http://svn.apache.org/r1680661
Log:
More plumbing for write.
Generally:
 - uses a state machine to co-orindate writes across multiple threads
 - all writes are done on the main Connection thread, not on the Stream threads
 - if there is a read thread, see if we need to (and can) write when it finishes
 - written with an eye on non-blocking IO but that side of things isn't fully thought through 

Also some improvements to debug logging. Added a connection ID and made logs slightly less verbose.

Added:
    tomcat/trunk/java/org/apache/coyote/http2/WriteStateMachine.java   (with props)
Modified:
    tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
    tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
    tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties
    tomcat/trunk/java/org/apache/coyote/http2/Stream.java

Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java?rev=1680661&r1=1680660&r2=1680661&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java Wed May 20 19:21:52 2015
@@ -48,8 +48,9 @@ abstract class AbstractStream {
 
     public void rePrioritise(AbstractStream parent, boolean exclusive, int weight) {
         if (getLog().isDebugEnabled()) {
-            getLog().debug(sm.getString("abstractStream.reprioritisation.debug", identifier,
-                    Boolean.toString(exclusive), parent.getIdentifier(), Integer.toString(weight)));
+            getLog().debug(sm.getString("abstractStream.reprioritisation.debug",
+                    Long.toString(getConnectionId()), identifier, Boolean.toString(exclusive),
+                    parent.getIdentifier(), Integer.toString(weight)));
         }
 
         // Check if new parent is a descendant of this stream
@@ -120,4 +121,6 @@ abstract class AbstractStream {
     }
 
     protected abstract Log getLog();
+
+    protected abstract int getConnectionId();
 }

Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1680661&r1=1680660&r2=1680661&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Wed May 20 19:21:52 2015
@@ -22,11 +22,15 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.servlet.http.WebConnection;
 
 import org.apache.coyote.Adapter;
 import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
+import org.apache.coyote.http2.WriteStateMachine.WriteState;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
@@ -52,6 +56,7 @@ public class Http2UpgradeHandler extends
     private static final Log log = LogFactory.getLog(Http2UpgradeHandler.class);
     private static final StringManager sm = StringManager.getManager(Http2UpgradeHandler.class);
 
+    private static final AtomicInteger connectionIdGenerator = new AtomicInteger(0);
     private static final Integer STREAM_ID_ZERO = Integer.valueOf(0);
 
     private static final int FRAME_TYPE_HEADERS = 1;
@@ -63,6 +68,7 @@ public class Http2UpgradeHandler extends
     private static final byte[] SETTINGS_ACK = { 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00 };
     private static final byte[] GOAWAY = { 0x07, 0x00, 0x00, 0x00, 0x00 };
 
+    private final int connectionId;
     private final Adapter adapter;
     private volatile SocketWrapperBase<?> socketWrapper;
     private volatile boolean initialized = false;
@@ -81,15 +87,22 @@ public class Http2UpgradeHandler extends
 
     private final Map<Integer,Stream> streams = new HashMap<>();
 
+    private final WriteStateMachine writeStateMachine = new WriteStateMachine();
+    private final Queue<Object> writeQueue = new ConcurrentLinkedQueue<>();
 
     public Http2UpgradeHandler(Adapter adapter) {
         super (STREAM_ID_ZERO);
         this.adapter = adapter;
+        this.connectionId = connectionIdGenerator.getAndIncrement();
     }
 
 
     @Override
     public void init(WebConnection unused) {
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("upgradeHandler.init", Long.toString(connectionId)));
+        }
+
         initialized = true;
 
         // Send the initial settings frame
@@ -110,13 +123,21 @@ public class Http2UpgradeHandler extends
 
     @Override
     public SocketState upgradeDispatch(SocketStatus status) {
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("upgradeHandler.upgradeDispatch.entry",
+                    Long.toString(connectionId), status));
+        }
+
         if (!initialized) {
             // WebConnection is not used so passing null here is fine
             init(null);
         }
 
+        SocketState result = SocketState.CLOSED;
+
         switch(status) {
         case OPEN_READ:
+            writeStateMachine.startRead();
             // Gets set to null once the connection preface has been
             // successfully parsed.
             if (connectionPrefaceParser != null) {
@@ -124,10 +145,11 @@ public class Http2UpgradeHandler extends
                     if (connectionPrefaceParser.isError()) {
                         // Any errors will have already been logged.
                         close();
-                        return SocketState.CLOSED;
+                        break;
                     } else {
                         // Incomplete
-                        return SocketState.LONG;
+                        result = SocketState.UPGRADED;
+                        break;
                     }
                 }
             }
@@ -142,7 +164,7 @@ public class Http2UpgradeHandler extends
                     // Connection error
                     log.warn(sm.getString("upgradeHandler.connectionError"), h2e);
                     close(h2e);
-                    return SocketState.CLOSED;
+                    break;
                 } else {
                     // Stream error
                     // TODO Reset stream
@@ -152,15 +174,22 @@ public class Http2UpgradeHandler extends
                     log.debug(sm.getString("upgradeHandler.processFrame.ioerror"), ioe);
                 }
                 close();
-                return SocketState.CLOSED;
+                result = SocketState.CLOSED;
+                break;
             }
 
-            // TODO process writes
+            if (writeStateMachine.endRead()) {
+                processWrites();
+            }
 
-            return SocketState.LONG;
+            result = SocketState.UPGRADED;
+            break;
 
         case OPEN_WRITE:
-            // TODO
+            if (writeStateMachine.startWrite()) {
+                processWrites();
+            }
+            result = SocketState.UPGRADED;
             break;
 
         case ASYNC_READ_ERROR:
@@ -178,13 +207,15 @@ public class Http2UpgradeHandler extends
             // For all of the above, including the unexpected values, close the
             // connection.
             close();
-            return SocketState.CLOSED;
+            result = SocketState.CLOSED;
+            break;
         }
 
-        // TODO This is for debug purposes to make sure ALPN is working.
-        log.fatal("TODO: Handle SocketStatus: " + status);
-        close();
-        return SocketState.CLOSED;
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("upgradeHandler.upgradeDispatch.exit",
+                    Long.toString(connectionId), result));
+        }
+        return result;
     }
 
 
@@ -224,8 +255,8 @@ public class Http2UpgradeHandler extends
     private void processFrameHeaders(int flags, int streamId, int payloadSize) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug(sm.getString("upgradeHandler.processFrame",
-                    Integer.toString(FRAME_TYPE_HEADERS), Integer.toString(flags),
-                    Integer.toString(streamId), Integer.toString(payloadSize)));
+                    Long.toString(connectionId), Integer.toString(streamId),
+                    Integer.toString(flags), Integer.toString(payloadSize)));
         }
 
         // Validate the stream
@@ -309,8 +340,8 @@ public class Http2UpgradeHandler extends
     private void processFramePriority(int flags, int streamId, int payloadSize) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug(sm.getString("upgradeHandler.processFrame",
-                    Integer.toString(FRAME_TYPE_PRIORITY), Integer.toString(flags),
-                    Integer.toString(streamId), Integer.toString(payloadSize)));
+                    Long.toString(connectionId), Integer.toString(streamId),
+                    Integer.toString(flags), Integer.toString(payloadSize)));
         }
         // Validate the frame
         if (streamId == 0) {
@@ -353,8 +384,8 @@ public class Http2UpgradeHandler extends
     private void processFrameSettings(int flags, int streamId, int payloadSize) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug(sm.getString("upgradeHandler.processFrame",
-                    Integer.toString(FRAME_TYPE_SETTINGS), Integer.toString(flags),
-                    Integer.toString(streamId), Integer.toString(payloadSize)));
+                    Long.toString(connectionId), Integer.toString(streamId),
+                    Integer.toString(flags), Integer.toString(payloadSize)));
         }
         // Validate the frame
         if (streamId != 0) {
@@ -401,8 +432,8 @@ public class Http2UpgradeHandler extends
             throws IOException {
         if (log.isDebugEnabled()) {
             log.debug(sm.getString("upgradeHandler.processFrame",
-                    Integer.toString(FRAME_TYPE_WINDOW_UPDATE), Integer.toString(flags),
-                    Integer.toString(streamId), Integer.toString(payloadSize)));
+                    Long.toString(connectionId), Integer.toString(streamId),
+                    Integer.toString(flags), Integer.toString(payloadSize)));
         }
         // Validate the frame
         if (payloadSize != 4) {
@@ -417,7 +448,8 @@ public class Http2UpgradeHandler extends
 
         if (log.isDebugEnabled()) {
             log.debug(sm.getString("upgradeHandler.processFrameWindowUpdate.debug",
-                    Integer.toString(streamId), Integer.toString(windowSizeIncrement)));
+                    Long.toString(connectionId), Integer.toString(streamId),
+                    Integer.toString(windowSizeIncrement)));
         }
 
         // Validate the data
@@ -579,6 +611,44 @@ public class Http2UpgradeHandler extends
     }
 
 
+    private void processWrites() {
+        Object obj;
+        while ((obj = getThingToWrite()) != null) {
+            // TODO
+            log.debug("TODO: write [" + obj.toString() + "]");
+        }
+    }
+
+
+    private Object getThingToWrite() {
+        synchronized (writeStateMachine) {
+            // TODO This is more complicated than pulling an object off a queue.
+
+            // Note: The checking of the queue for something to write and the
+            //       calling of endWrite() if nothing is found must be kept
+            //       within the same sync to avoid race conditions with adding
+            //       entries to the queue.
+            Object obj = writeQueue.poll();
+            if (obj == null) {
+                 writeStateMachine.endWrite(WriteState.IDLE);
+            }
+            return obj;
+        }
+    }
+
+
+    void addWrite(Object obj) {
+        boolean needDispatch;
+        synchronized (writeStateMachine) {
+            writeQueue.add(obj);
+            needDispatch = writeStateMachine.addWrite();
+        }
+        if (needDispatch) {
+            socketWrapper.processSocket(SocketStatus.OPEN_WRITE, true);
+        }
+    }
+
+
     private Stream getStream(int streamId) {
         Integer key = Integer.valueOf(streamId);
 
@@ -608,6 +678,12 @@ public class Http2UpgradeHandler extends
     }
 
 
+    @Override
+    protected final int getConnectionId() {
+        return connectionId;
+    }
+
+
     @Override
     protected final Log getLog() {
         return log;

Modified: tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties?rev=1680661&r1=1680660&r2=1680661&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties Wed May 20 19:21:52 2015
@@ -13,7 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-abstractStream.reprioritisation.debug=Reprioritising stream [{0}] with exclusive [{1}], parent [{2}] and weight [{3}]
+abstractStream.reprioritisation.debug=Connection [{0}], Stream [{1}], Exclusive [{2}], Parent [{3}], Weight [{4}]
+
 connectionPrefaceParser.eos=Unexpected end of stream while reading opening client preface byte sequence. Only [{0}] bytes read.
 connectionPrefaceParser.ioError=Failed to read opening client preface byte sequence
 connectionPrefaceParser.mismatch=An unexpected byte sequence was received at the start of the client preface [{0}]
@@ -30,13 +31,15 @@ hpackdecoder.zeroNotValidHeaderTableInde
 
 hpackhuffman.huffmanEncodedHpackValueDidNotEndWithEOS=Huffman encoded value in HPACK headers did not end with EOS padding
 
-stream.header.debug=Stream [{0}] recieved HTTP header [{1}] with value [{2}]
+stream.header.debug=Connection [{0}], Stream [{1}], HTTP header [{2}], Value [{3}]
+stream.write=Connection [{0}], Stream [{1}]
 
 streamProcessor.httpupgrade.notsupported=HTTP upgrade is not supported within HTTP/2 streams
 
 upgradeHandler.connectionError=An error occurred that requires the HTTP/2 connection to be closed.
+upgradeHandler.init=Connection [{0}]
 upgradeHandler.payloadTooBig=The payload is [{0}] bytes long but the maximum frame size is [{1}]
-upgradeHandler.processFrame=Processing frame of type [{0}] for stream [{2}] with flags [{1}] and payload size [{3}]
+upgradeHandler.processFrame=Connection [{0}], Stream [{1}], Flags [{2}], Payload size [{3}]
 upgradeHandler.processFrame.ioerror=An I/O error occurred while reading an incoming HTTP/2 frame
 upgradeHandler.processFrameHeaders.invalidStream=Headers frame received for stream [0]
 upgradeHandler.processFrameHeaders.decodingFailed=There was an error during the HPACK decoding of HTTP headers
@@ -46,11 +49,17 @@ upgradeHandler.processFramePriority.inva
 upgradeHandler.processFrameSettings.ackWithNonZeroPayload=Settings frame received with the ACK flag set and payload present
 upgradeHandler.processFrameSettings.invalidPayloadSize=Settings frame received with a payload size of [{0}] which is not a multiple of 6
 upgradeHandler.processFrameSettings.invalidStream=Settings frame received for stream [{0}]
-upgradeHandler.processFrameWindowUpdate.debug=Received notification to increment the flow control window for stream [{0}] by [{1}]
+upgradeHandler.processFrameWindowUpdate.debug=Connection [{0}], Stream [{1}], Window size increment [{2}]
 upgradeHandler.processFrameWindowUpdate.invalidIncrement=Window update frame received with an invalid increment size of [0]
 upgradeHandler.processFrameWindowUpdate.invalidPayloadSize=Window update frame received with an invalid payload size of [{0}]
 upgradeHandler.receivePrefaceNotSettings=The first frame received from the client was not a settings frame
 upgradeHandler.sendPrefaceFail=Failed to send preface to client
 upgradeHandler.socketCloseFailed=Error closing socket
 upgradeHandler.unexpectedEos=Unexpected end of stream
-upgradeHandler.unexpectedStatus=An unexpected value of status ([{0}]) was passed to this method
\ No newline at end of file
+upgradeHandler.unexpectedStatus=An unexpected value of status ([{0}]) was passed to this method
+upgradeHandler.upgradeDispatch.entry=Entry, Connection [{0}], SocketStatus [{1}]
+upgradeHandler.upgradeDispatch.exit=Exit, Connection [{0}], SocketState [{1}]
+
+
+writeStateMachine.endWrite.ise=It is illegal to specify [{0}] for the new state once a write has completed
+writeStateMachine.ise=It is illegal to call [{0}()] in state [{1}]
\ No newline at end of file

Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1680661&r1=1680660&r2=1680661&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Wed May 20 19:21:52 2015
@@ -56,7 +56,8 @@ public class Stream extends AbstractStre
     @Override
     public void emitHeader(String name, String value, boolean neverIndex) {
         if (log.isDebugEnabled()) {
-            log.debug(sm.getString("stream.header.debug", getIdentifier(), name, value));
+            log.debug(sm.getString("stream.header.debug",
+                    Long.toString(getConnectionId()), getIdentifier(), name, value));
         }
 
         switch(name) {
@@ -95,13 +96,23 @@ public class Stream extends AbstractStre
 
 
     void writeHeaders() {
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("stream.write",
+                    Long.toString(getConnectionId()), getIdentifier()));
+        }
         // Format the frames.
         // TODO
+        handler.addWrite("HEADERS");
     }
 
 
     void flushData() {
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("stream.write",
+                    Long.toString(getConnectionId()), getIdentifier()));
+        }
         // TODO
+        handler.addWrite("DATA");
     }
 
 
@@ -111,6 +122,12 @@ public class Stream extends AbstractStre
     }
 
 
+    @Override
+    protected final int getConnectionId() {
+        return getParentStream().getConnectionId();
+    }
+
+
     public Request getCoyoteRequest() {
         return coyoteRequest;
     }

Added: tomcat/trunk/java/org/apache/coyote/http2/WriteStateMachine.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/WriteStateMachine.java?rev=1680661&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/WriteStateMachine.java (added)
+++ tomcat/trunk/java/org/apache/coyote/http2/WriteStateMachine.java Wed May 20 19:21:52 2015
@@ -0,0 +1,207 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http2;
+
+import org.apache.tomcat.util.res.StringManager;
+
+/**
+ * TODO. ASCII art state diagram once the state machine is stable.
+ */
+public class WriteStateMachine {
+
+    private static final StringManager sm = StringManager.getManager(WriteStateMachine.class);
+
+    private WriteState state = WriteState.IDLE;
+
+
+    synchronized void startRead() {
+        switch(state) {
+        case IDLE: {
+            state = WriteState.READ_IN_PROGRESS;
+            break;
+        }
+        case WRITE_PENDING: {
+            // NO-OP. Race condition between stream calling write() and poller
+            // triggering OPEN_READ
+            break;
+        }
+        case WRITE_PENDING_BLOCKED_IO:
+        case WRITE_PENDING_BLOCKED_FLOW_CONNECTION:
+        case WRITE_PENDING_BLOCKED_FLOW_STREAMS: {
+            // NO-OP for now. Incoming data may unblock flow control blocks
+            break;
+        }
+        case READ_IN_PROGRESS:
+        case WRITING: {
+            throw new IllegalStateException(
+                    sm.getString("writeStateMachine.ise", "startRead", state));
+        }
+        }
+    }
+
+    /**
+     * @return <code>true</code> if the state changed to WRITING.
+     */
+    synchronized boolean endRead() {
+        switch(state) {
+        case READ_IN_PROGRESS: {
+            state = WriteState.IDLE;
+            return false;
+        }
+        case WRITE_PENDING: {
+            state = WriteState.WRITING;
+            return true;
+        }
+        case IDLE:
+        case WRITING:
+        case WRITE_PENDING_BLOCKED_IO:
+        case WRITE_PENDING_BLOCKED_FLOW_STREAMS:
+        case WRITE_PENDING_BLOCKED_FLOW_CONNECTION:
+            throw new IllegalStateException(
+                    sm.getString("writeStateMachine.ise", "endRead", state));
+        }
+        // Never reaches here. This is just to keep the compiler happy.
+        return false;
+    }
+
+
+    synchronized void windowOpenedStream() {
+        switch (state) {
+        case WRITE_PENDING_BLOCKED_FLOW_STREAMS:
+            state = WriteState.WRITE_PENDING;
+            break;
+        case READ_IN_PROGRESS:
+        case WRITE_PENDING:
+        case WRITE_PENDING_BLOCKED_IO:
+        case WRITE_PENDING_BLOCKED_FLOW_CONNECTION:
+            // NO-OP
+            break;
+        case IDLE:
+        case WRITING:
+            throw new IllegalStateException(
+                    sm.getString("writeStateMachine.ise", "windowOpenedStream", state));
+        }
+    }
+
+
+    synchronized void windowOpenedConnection() {
+        switch (state) {
+        case WRITE_PENDING_BLOCKED_FLOW_CONNECTION:
+            state = WriteState.WRITE_PENDING;
+            break;
+        case READ_IN_PROGRESS:
+        case WRITE_PENDING:
+        case WRITE_PENDING_BLOCKED_IO:
+        case WRITE_PENDING_BLOCKED_FLOW_STREAMS:
+            // NO-OP
+            break;
+        case IDLE:
+        case WRITING:
+            throw new IllegalStateException(
+                    sm.getString("writeStateMachine.ise", "windowOpenedConnection", state));
+        }
+    }
+
+
+    synchronized boolean startWrite() {
+        switch (state) {
+        case WRITE_PENDING:
+        case WRITE_PENDING_BLOCKED_IO: {
+            state = WriteState.WRITING;
+            return true;
+        }
+        case IDLE:
+        case WRITE_PENDING_BLOCKED_FLOW_CONNECTION:
+        case WRITE_PENDING_BLOCKED_FLOW_STREAMS: {
+            // NO-OP. Race condition between stream calling write() and poller
+            // triggering OPEN_READ
+            return false;
+        }
+        case READ_IN_PROGRESS:
+        case WRITING:
+            throw new IllegalStateException(
+                    sm.getString("writeStateMachine.ise", "startWrite", state));
+        }
+        // Never reaches here. This is just to keep the compiler happy.
+        return false;
+    }
+
+
+    synchronized void endWrite(WriteState newState) {
+        switch (state) {
+        case WRITING: {
+            switch (newState) {
+            case IDLE:
+            case WRITE_PENDING_BLOCKED_IO:
+            case WRITE_PENDING_BLOCKED_FLOW_STREAMS:
+            case WRITE_PENDING_BLOCKED_FLOW_CONNECTION: {
+                state = newState;
+                break;
+            }
+            case WRITE_PENDING:
+            case WRITING:
+            case READ_IN_PROGRESS:
+                throw new IllegalStateException(
+                        sm.getString("writeStateMachine.endWrite.ise", newState));
+            }
+            break;
+        }
+        case IDLE:
+        case WRITE_PENDING:
+        case WRITE_PENDING_BLOCKED_FLOW_CONNECTION:
+        case WRITE_PENDING_BLOCKED_FLOW_STREAMS:
+        case WRITE_PENDING_BLOCKED_IO:
+        case READ_IN_PROGRESS:
+            throw new IllegalStateException(
+                    sm.getString("writeStateMachine.ise", "endWrite", state));
+        }
+    }
+
+
+    /**
+     * @return <code>true</code> if there needs to be a dispatch for OPEN_WRITE
+     *         to trigger the actual write.
+     */
+    synchronized boolean addWrite() {
+        switch(state) {
+        case IDLE:
+        case READ_IN_PROGRESS:
+        case WRITE_PENDING_BLOCKED_FLOW_STREAMS: {
+            state = WriteState.WRITE_PENDING;
+            return true;
+        }
+        case WRITE_PENDING:
+        case WRITE_PENDING_BLOCKED_FLOW_CONNECTION:
+        case WRITE_PENDING_BLOCKED_IO:
+        case WRITING:
+            // NO-OP
+            return false;
+        }
+        // Never reaches here. This is just to keep the compiler happy.
+        return false;
+    }
+
+    static enum WriteState {
+        IDLE,
+        READ_IN_PROGRESS,
+        WRITE_PENDING,
+        WRITE_PENDING_BLOCKED_IO,
+        WRITE_PENDING_BLOCKED_FLOW_STREAMS,
+        WRITE_PENDING_BLOCKED_FLOW_CONNECTION,
+        WRITING
+    }
+}

Propchange: tomcat/trunk/java/org/apache/coyote/http2/WriteStateMachine.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: svn commit: r1680661 - in /tomcat/trunk/java/org/apache/coyote/http2: AbstractStream.java Http2UpgradeHandler.java LocalStrings.properties Stream.java WriteStateMachine.java

Posted by Mark Thomas <ma...@apache.org>.
On 20/05/2015 20:21, markt@apache.org wrote:
> Author: markt
> Date: Wed May 20 19:21:52 2015
> New Revision: 1680661
> 
> URL: http://svn.apache.org/r1680661
> Log:
> More plumbing for write.
> Generally:
>  - uses a state machine to co-orindate writes across multiple threads
>  - all writes are done on the main Connection thread, not on the Stream threads
>  - if there is a read thread, see if we need to (and can) write when it finishes
>  - written with an eye on non-blocking IO but that side of things isn't fully thought through 

With my simple test for http://localhost:8443/examples I now see the
fake header write (for the redirect) and the fake write for the final
response flush in the logs. The next step is to turn these into real
writes and get the response back to the client.

Mark

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org