You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2017/05/04 10:11:24 UTC
svn commit: r1793763 - in /tomcat/trunk/java/org/apache/coyote/http2:
Http2AsyncUpgradeHandler.java Http2UpgradeHandler.java
Author: markt
Date: Thu May 4 10:11:24 2017
New Revision: 1793763
URL: http://svn.apache.org/viewvc?rev=1793763&view=rev
Log:
More refactoring to reduce duplication prior to adding trailer header support.
Modified:
tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java?rev=1793763&r1=1793762&r2=1793763&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java Thu May 4 10:11:24 2017
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.coyote.Adapter;
import org.apache.coyote.ProtocolException;
import org.apache.coyote.Request;
-import org.apache.coyote.http2.HpackEncoder.State;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.SocketWrapperBase;
import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode;
@@ -135,67 +134,13 @@ public class Http2AsyncUpgradeHandler ex
@Override
void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders,
boolean endOfStream, int payloadSize) throws IOException {
+ doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream, payloadSize);
+ }
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("upgradeHandler.writeHeaders", connectionId,
- stream.getIdentifier(), Integer.valueOf(pushedStreamId),
- Boolean.valueOf(endOfStream)));
- }
-
- if (!stream.canWrite()) {
- return;
- }
-
- byte[] pushedStreamIdBytes = null;
- if (pushedStreamId > 0) {
- pushedStreamIdBytes = new byte[4];
- ByteUtil.set31Bits(pushedStreamIdBytes, 0, pushedStreamId);
- }
- boolean first = true;
- State state = null;
- ArrayList<ByteBuffer> bufs = new ArrayList<>();
-
- while (state != State.COMPLETE) {
- byte[] header = new byte[9];
- ByteBuffer payload = ByteBuffer.allocate(payloadSize);
- if (first && pushedStreamIdBytes != null) {
- payload.put(pushedStreamIdBytes);
- }
- state = getHpackEncoder().encode(mimeHeaders, payload);
- payload.flip();
- if (state == State.COMPLETE || payload.limit() > 0) {
- ByteUtil.setThreeBytes(header, 0, payload.limit());
- if (first) {
- first = false;
- if (pushedStreamIdBytes == null) {
- header[3] = FrameType.HEADERS.getIdByte();
- } else {
- header[3] = FrameType.PUSH_PROMISE.getIdByte();
- }
- if (endOfStream) {
- header[4] = FLAG_END_OF_STREAM;
- }
- } else {
- header[3] = FrameType.CONTINUATION.getIdByte();
- }
- if (state == State.COMPLETE) {
- header[4] += FLAG_END_OF_HEADERS;
- }
- if (log.isDebugEnabled()) {
- log.debug(payload.limit() + " bytes");
- }
- ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue());
- bufs.add(ByteBuffer.wrap(header));
- bufs.add(payload);
- } else if (state == State.UNDERFLOW) {
- payloadSize = payloadSize * 2;
- }
- }
- socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS,
- null, SocketWrapperBase.COMPLETE_WRITE, applicationErrorCompletion,
- bufs.toArray(BYTEBUFFER_ARRAY));
- handleAsyncException();
+ @Override
+ protected HeaderFrameBuffers getHeaderFrameBuffers(int initialPayloadSize) {
+ return new AsyncHeaderFrameBuffers(initialPayloadSize);
}
@@ -315,4 +260,53 @@ public class Http2AsyncUpgradeHandler ex
}
+
+ private class AsyncHeaderFrameBuffers implements HeaderFrameBuffers {
+
+ int payloadSize;
+
+ private byte[] header;
+ private ByteBuffer payload;
+
+ private final ArrayList<ByteBuffer> bufs = new ArrayList<>();
+
+ public AsyncHeaderFrameBuffers(int initialPayloadSize) {
+ this.payloadSize = initialPayloadSize;
+ }
+
+ @Override
+ public void startFrame() {
+ header = new byte[9];
+ payload = ByteBuffer.allocate(payloadSize);
+ }
+
+ @Override
+ public void endFrame() throws IOException {
+ bufs.add(ByteBuffer.wrap(header));
+ bufs.add(payload);
+ }
+
+ @Override
+ public void endHeaders() throws IOException {
+ socketWrapper.write(BlockingMode.SEMI_BLOCK, getWriteTimeout(), TimeUnit.MILLISECONDS,
+ null, SocketWrapperBase.COMPLETE_WRITE, applicationErrorCompletion,
+ bufs.toArray(BYTEBUFFER_ARRAY));
+ handleAsyncException();
+ }
+
+ @Override
+ public byte[] getHeader() {
+ return header;
+ }
+
+ @Override
+ public ByteBuffer getPayload() {
+ return payload;
+ }
+
+ @Override
+ public void expandPayload() {
+ payloadSize = payloadSize * 2;
+ }
+ }
}
Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1793763&r1=1793762&r2=1793763&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Thu May 4 10:11:24 2017
@@ -524,6 +524,19 @@ class Http2UpgradeHandler extends Abstra
void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders,
boolean endOfStream, int payloadSize) throws IOException {
+ // This ensures the Stream processing thread has control of the socket.
+ synchronized (socketWrapper) {
+ doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream, payloadSize);
+ }
+ }
+
+
+ /*
+ * Separate method to allow Http2AsyncUpgradeHandler to call this code
+ * without synchronizing on socketWrapper since it doesn't need to.
+ */
+ void doWriteHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders,
+ boolean endOfStream, int payloadSize) throws IOException {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.writeHeaders", connectionId,
@@ -535,8 +548,7 @@ class Http2UpgradeHandler extends Abstra
return;
}
- byte[] header = new byte[9];
- ByteBuffer payload = ByteBuffer.allocate(payloadSize);
+ HeaderFrameBuffers headerFrameBuffers = getHeaderFrameBuffers(payloadSize);
byte[] pushedStreamIdBytes = null;
if (pushedStreamId > 0) {
@@ -547,49 +559,46 @@ class Http2UpgradeHandler extends Abstra
boolean first = true;
State state = null;
- // This ensures the Stream processing thread has control of the socket.
- synchronized (socketWrapper) {
- while (state != State.COMPLETE) {
- if (first && pushedStreamIdBytes != null) {
- payload.put(pushedStreamIdBytes);
- }
- state = getHpackEncoder().encode(mimeHeaders, payload);
- payload.flip();
- if (state == State.COMPLETE || payload.limit() > 0) {
- ByteUtil.setThreeBytes(header, 0, payload.limit());
- if (first) {
- first = false;
- if (pushedStreamIdBytes == null) {
- header[3] = FrameType.HEADERS.getIdByte();
- } else {
- header[3] = FrameType.PUSH_PROMISE.getIdByte();
- }
- if (endOfStream) {
- header[4] = FLAG_END_OF_STREAM;
- }
+ while (state != State.COMPLETE) {
+ headerFrameBuffers.startFrame();
+ if (first && pushedStreamIdBytes != null) {
+ headerFrameBuffers.getPayload().put(pushedStreamIdBytes);
+ }
+ state = getHpackEncoder().encode(mimeHeaders, headerFrameBuffers.getPayload());
+ headerFrameBuffers.getPayload().flip();
+ if (state == State.COMPLETE || headerFrameBuffers.getPayload().limit() > 0) {
+ ByteUtil.setThreeBytes(headerFrameBuffers.getHeader(), 0, headerFrameBuffers.getPayload().limit());
+ if (first) {
+ first = false;
+ if (pushedStreamIdBytes == null) {
+ headerFrameBuffers.getHeader()[3] = FrameType.HEADERS.getIdByte();
} else {
- header[3] = FrameType.CONTINUATION.getIdByte();
+ headerFrameBuffers.getHeader()[3] = FrameType.PUSH_PROMISE.getIdByte();
}
- if (state == State.COMPLETE) {
- header[4] += FLAG_END_OF_HEADERS;
- }
- if (log.isDebugEnabled()) {
- log.debug(payload.limit() + " bytes");
+ if (endOfStream) {
+ headerFrameBuffers.getHeader()[4] = FLAG_END_OF_STREAM;
}
- ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue());
- try {
- socketWrapper.write(true, header, 0, header.length);
- socketWrapper.write(true, payload);
- socketWrapper.flush(true);
- } catch (IOException ioe) {
- handleAppInitiatedIOException(ioe);
- }
- payload.clear();
- } else if (state == State.UNDERFLOW) {
- payload = ByteBuffer.allocate(payload.capacity() * 2);
+ } else {
+ headerFrameBuffers.getHeader()[3] = FrameType.CONTINUATION.getIdByte();
}
+ if (state == State.COMPLETE) {
+ headerFrameBuffers.getHeader()[4] += FLAG_END_OF_HEADERS;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(headerFrameBuffers.getPayload().limit() + " bytes");
+ }
+ ByteUtil.set31Bits(headerFrameBuffers.getHeader(), 5, stream.getIdentifier().intValue());
+ headerFrameBuffers.endFrame();
+ } else if (state == State.UNDERFLOW) {
+ headerFrameBuffers.expandPayload();
}
}
+ headerFrameBuffers.endHeaders();
+ }
+
+
+ protected HeaderFrameBuffers getHeaderFrameBuffers(int initialPayloadSize) {
+ return new DefaultHeaderFrameBuffers(initialPayloadSize);
}
@@ -1570,7 +1579,7 @@ class Http2UpgradeHandler extends Abstra
}
- private enum ConnectionState {
+ private static enum ConnectionState {
NEW(true),
CONNECTED(true),
@@ -1587,5 +1596,65 @@ class Http2UpgradeHandler extends Abstra
public boolean isNewStreamAllowed() {
return newStreamsAllowed;
}
- }
+ }
+
+
+ protected static interface HeaderFrameBuffers {
+ public void startFrame();
+ public void endFrame() throws IOException;
+ public void endHeaders() throws IOException;
+ public byte[] getHeader();
+ public ByteBuffer getPayload();
+ public void expandPayload();
+ }
+
+
+ private class DefaultHeaderFrameBuffers implements HeaderFrameBuffers {
+
+ private final byte[] header;
+ private ByteBuffer payload;
+
+ public DefaultHeaderFrameBuffers(int initialPayloadSize) {
+ header = new byte[9];
+ payload = ByteBuffer.allocate(initialPayloadSize);
+ }
+
+ @Override
+ public void startFrame() {
+ // NO-OP
+ }
+
+
+ @Override
+ public void endFrame() throws IOException {
+ try {
+ socketWrapper.write(true, header, 0, header.length);
+ socketWrapper.write(true, payload);
+ socketWrapper.flush(true);
+ } catch (IOException ioe) {
+ handleAppInitiatedIOException(ioe);
+ }
+ payload.clear();
+ }
+
+ @Override
+ public void endHeaders() {
+ // NO-OP
+ }
+
+ @Override
+ public byte[] getHeader() {
+ return header;
+ }
+
+ @Override
+ public ByteBuffer getPayload() {
+ return payload;
+ }
+
+ @Override
+ public void expandPayload() {
+ payload = ByteBuffer.allocate(payload.capacity() * 2);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org