You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2016/10/31 17:33:33 UTC
svn commit: r1767339 [8/14] - in /httpcomponents/httpcore/trunk: ./
httpcore5-ab/src/main/java/org/apache/hc/core5/http/benchmark/
httpcore5-ab/src/test/java/org/apache/hc/core5/http/benchmark/
httpcore5-h2/src/main/java/org/apache/hc/core5/http2/boots...
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicConnPool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicConnPool.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicConnPool.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/io/BasicPoolEntry.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/io/BasicPoolEntry.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/io/BasicPoolEntry.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java Mon Oct 31 17:33:27 2016
@@ -24,7 +24,7 @@
* <http://www.apache.org/>.
*
*/
-package org.apache.hc.core5.http.pool.io;
+package org.apache.hc.core5.http.impl.io.pool;
import java.io.IOException;
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/io/package-info.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/io/package-info.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/io/package-info.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java Mon Oct 31 17:33:27 2016
@@ -29,4 +29,4 @@
* Default implementations of client side connection pools
* for synchronous, blocking communication.
*/
-package org.apache.hc.core5.http.pool.io;
+package org.apache.hc.core5.http.impl.io.pool;
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractClassicServerExchangeHandler.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java&p1=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractClassicServerExchangeHandler.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractClassicServerExchangeHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java Mon Oct 31 17:33:27 2016
@@ -24,7 +24,7 @@
* <http://www.apache.org/>.
*
*/
-package org.apache.hc.core5.http2.impl.nio;
+package org.apache.hc.core5.http.impl.nio;
import java.io.IOException;
import java.io.InputStream;
@@ -45,17 +45,18 @@ import org.apache.hc.core5.http.HttpRequ
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.impl.nio.entity.SharedInputBuffer;
+import org.apache.hc.core5.http.impl.nio.entity.SharedOutputBuffer;
import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.message.HttpResponseWrapper;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ExpectationChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
import org.apache.hc.core5.http.nio.entity.ContentInputStream;
import org.apache.hc.core5.http.nio.entity.ContentOutputStream;
-import org.apache.hc.core5.http2.nio.AsyncServerExchangeHandler;
-import org.apache.hc.core5.http2.nio.CapacityChannel;
-import org.apache.hc.core5.http2.nio.DataStreamChannel;
-import org.apache.hc.core5.http2.nio.ExpectationChannel;
-import org.apache.hc.core5.http2.nio.ResponseChannel;
-import org.apache.hc.core5.http2.impl.nio.entity.SharedInputBuffer;
-import org.apache.hc.core5.http2.impl.nio.entity.SharedOutputBuffer;
+import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
@@ -71,6 +72,7 @@ public abstract class AbstractClassicSer
private final AtomicReference<State> state;
private final AtomicReference<Exception> exception;
+ private volatile HttpContext context;
private volatile SharedInputBuffer inputBuffer;
private volatile SharedOutputBuffer outputBuffer;
@@ -85,9 +87,15 @@ public abstract class AbstractClassicSer
return exception.get();
}
+ @Override
+ public void setContext(final HttpContext context) {
+ this.context = context;
+ }
+
protected abstract void handle(
HttpRequest request, InputStream requestStream,
- HttpResponse response, OutputStream responseStream) throws IOException, HttpException;
+ HttpResponse response, OutputStream responseStream,
+ HttpContext context) throws IOException, HttpException;
@Override
public final void verify(
@@ -232,7 +240,7 @@ public abstract class AbstractClassicSer
@Override
public void run() {
try {
- handle(request, inputStream, responseWrapper, outputStream);
+ handle(request, inputStream, responseWrapper, outputStream, context);
if (inputStream != null) {
inputStream.close();
}
@@ -253,11 +261,6 @@ public abstract class AbstractClassicSer
}
@Override
- public final int capacity() {
- return inputBuffer != null ? inputBuffer.available() : 0;
- }
-
- @Override
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
if (inputBuffer != null) {
inputBuffer.updateCapacity(capacityChannel);
@@ -265,9 +268,9 @@ public abstract class AbstractClassicSer
}
@Override
- public final void consume(final ByteBuffer src) throws IOException {
+ public final int consume(final ByteBuffer src) throws IOException {
Asserts.notNull(inputBuffer, "Input buffer");
- inputBuffer.fill(src);
+ return inputBuffer.fill(src);
}
@Override
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,137 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.nio;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import org.apache.hc.core5.http.HttpConnectionMetrics;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Args;
+
+class AbstractHttp1IOEventHandler implements HttpConnectionEventHandler {
+
+ private final AbstractHttp1StreamDuplexer<?, ?> streamDuplexer;
+
+ AbstractHttp1IOEventHandler(final AbstractHttp1StreamDuplexer<?, ?> streamDuplexer) {
+ this.streamDuplexer = Args.notNull(streamDuplexer, "Stream multiplexer");
+ }
+
+ @Override
+ public void connected(final IOSession session) {
+ try {
+ streamDuplexer.onConnect();
+ } catch (final Exception ex) {
+ streamDuplexer.onException(ex);
+ }
+ }
+
+ @Override
+ public void inputReady(final IOSession session) {
+ try {
+ streamDuplexer.onInput();
+ } catch (final Exception ex) {
+ streamDuplexer.onException(ex);
+ }
+ }
+
+ @Override
+ public void outputReady(final IOSession session) {
+ try {
+ streamDuplexer.onOutput();
+ } catch (final Exception ex) {
+ streamDuplexer.onException(ex);
+ }
+ }
+
+ @Override
+ public void timeout(final IOSession session) {
+ try {
+ streamDuplexer.onTimeout();
+ } catch (final Exception ex) {
+ streamDuplexer.onException(ex);
+ }
+ }
+
+ @Override
+ public void disconnected(final IOSession session) {
+ streamDuplexer.onDisconnect();
+ }
+
+ @Override
+ public void close() throws IOException {
+ streamDuplexer.close();
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ streamDuplexer.shutdown();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return streamDuplexer.isOpen();
+ }
+
+ @Override
+ public void setSocketTimeout(final int timeout) {
+ streamDuplexer.setSocketTimeout(timeout);
+ }
+
+ @Override
+ public HttpConnectionMetrics getMetrics() {
+ return streamDuplexer.getMetrics();
+ }
+
+ @Override
+ public int getSocketTimeout() {
+ return streamDuplexer.getSocketTimeout();
+ }
+
+ @Override
+ public ProtocolVersion getProtocolVersion() {
+ return streamDuplexer.getProtocolVersion();
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return streamDuplexer.getRemoteAddress();
+ }
+
+ @Override
+ public SocketAddress getLocalAddress() {
+ return streamDuplexer.getLocalAddress();
+ }
+
+ @Override
+ public String toString() {
+ return streamDuplexer.toString();
+ }
+
+}
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,492 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.nio;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.WritableByteChannel;
+import java.util.Deque;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.http.ConnectionClosedException;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpConnectionMetrics;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpMessage;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.config.ConnectionConfig;
+import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
+import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
+import org.apache.hc.core5.http.impl.ConnSupport;
+import org.apache.hc.core5.http.nio.ContentDecoder;
+import org.apache.hc.core5.http.nio.ContentEncoder;
+import org.apache.hc.core5.http.nio.NHttpMessageParser;
+import org.apache.hc.core5.http.nio.NHttpMessageWriter;
+import org.apache.hc.core5.http.nio.ResourceHolder;
+import org.apache.hc.core5.http.nio.SessionInputBuffer;
+import org.apache.hc.core5.http.nio.SessionOutputBuffer;
+import org.apache.hc.core5.http.nio.command.ExecutionCommand;
+import org.apache.hc.core5.http.nio.command.ShutdownCommand;
+import org.apache.hc.core5.http.nio.command.ShutdownType;
+import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.reactor.EventMask;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.HeapByteBufferAllocator;
+import org.apache.hc.core5.util.NetUtils;
+
+abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
+ implements ResourceHolder, HttpConnection {
+
+ private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
+
+ private final IOSession ioSession;
+ private final SessionInputBufferImpl inbuf;
+ private final SessionOutputBufferImpl outbuf;
+ private final BasicHttpTransportMetrics inTransportMetrics;
+ private final BasicHttpTransportMetrics outTransportMetrics;
+ private final BasicHttpConnectionMetrics connMetrics;
+ private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
+ private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
+ private final ConnectionListener connectionListener;
+ private final Lock outputLock;
+
+ private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
+ private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
+ private volatile ConnectionState connState = ConnectionState.READY;
+
+ private volatile ProtocolVersion version;
+
+ AbstractHttp1StreamDuplexer(
+ final IOSession ioSession,
+ final ConnectionConfig connectionConfig,
+ final NHttpMessageParser<IncomingMessage> incomingMessageParser,
+ final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
+ final ConnectionListener connectionListener) {
+ this.ioSession = Args.notNull(ioSession, "I/O session");
+ final int bufferSize = connectionConfig.getBufferSize();
+ this.inbuf = new SessionInputBufferImpl(bufferSize,
+ bufferSize < 512 ? bufferSize : 512,
+ ConnSupport.createDecoder(connectionConfig),
+ HeapByteBufferAllocator.INSTANCE);
+ this.outbuf = new SessionOutputBufferImpl(bufferSize,
+ bufferSize < 512 ? bufferSize : 512,
+ ConnSupport.createEncoder(connectionConfig),
+ HeapByteBufferAllocator.INSTANCE);
+ this.inTransportMetrics = new BasicHttpTransportMetrics();
+ this.outTransportMetrics = new BasicHttpTransportMetrics();
+ this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
+ this.incomingMessageParser = incomingMessageParser;
+ this.outgoingMessageWriter = outgoingMessageWriter;
+ this.connectionListener = connectionListener;
+ this.outputLock = new ReentrantLock();
+ this.connState = ConnectionState.READY;
+ }
+
+ void doTerminate(final Exception exception) {
+ connState = ConnectionState.SHUTDOWN;
+ try {
+ terminate(exception);
+ } finally {
+ ioSession.shutdown();
+ }
+ }
+
+ abstract void terminate(final Exception exception);
+
+ abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
+
+ abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
+
+ abstract void consumeHeader(IncomingMessage messageHead, boolean endStream) throws HttpException, IOException;
+
+ abstract ContentDecoder handleIncomingMessage(
+ IncomingMessage incomingMessage,
+ ReadableByteChannel channel,
+ SessionInputBuffer buffer,
+ BasicHttpTransportMetrics metrics) throws HttpException;
+
+ abstract ContentEncoder handleOutgoingMessage(
+ OutgoingMessage outgoingMessage,
+ WritableByteChannel channel,
+ SessionOutputBuffer buffer,
+ BasicHttpTransportMetrics metrics) throws HttpException;
+
+ abstract void consumeData(ContentDecoder contentDecoder) throws HttpException, IOException;
+
+ abstract boolean isOutputReady();
+
+ abstract void produceOutput() throws HttpException, IOException;
+
+ abstract void execute(ExecutionCommand executionCommand) throws HttpException, IOException;
+
+ abstract void inputEnd() throws HttpException, IOException;
+
+ abstract void outputEnd() throws HttpException, IOException;
+
+ abstract boolean inputIdle();
+
+ abstract boolean outputIdle();
+
+ abstract boolean handleTimeout();
+
+ private void processCommands() throws HttpException, IOException {
+ for (;;) {
+ final Command command = ioSession.getCommandQueue().poll();
+ if (command == null) {
+ return;
+ }
+ if (command instanceof ShutdownCommand) {
+ final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
+ requestShutdown(shutdownCommand.getType());
+ } else if (command instanceof ExecutionCommand) {
+ if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
+ command.cancel();
+ } else {
+ execute((ExecutionCommand) command);
+ return;
+ }
+ } else {
+ throw new HttpException("Unexpected command: " + command.getClass());
+ }
+ }
+ }
+
+ public final void onConnect() throws HttpException, IOException {
+ if (connectionListener != null) {
+ connectionListener.onConnect(this);
+ }
+ connState = ConnectionState.ACTIVE;
+ processCommands();
+ }
+
+ public final void onInput() throws HttpException, IOException {
+ do {
+ if (incomingMessage == null) {
+
+ if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle()) {
+ ioSession.clearEvent(SelectionKey.OP_READ);
+ return;
+ }
+
+ int bytesRead;
+ do {
+ bytesRead = inbuf.fill(ioSession.channel());
+ if (bytesRead > 0) {
+ inTransportMetrics.incrementBytesTransferred(bytesRead);
+ }
+ final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, bytesRead == -1);
+ if (messageHead != null) {
+ incomingMessageParser.reset();
+
+ this.version = messageHead.getVersion();
+
+ updateInputMetrics(messageHead, connMetrics);
+ final ContentDecoder contentDecoder = handleIncomingMessage(messageHead, ioSession.channel(), inbuf, inTransportMetrics);
+ consumeHeader(messageHead, contentDecoder == null);
+ if (contentDecoder != null) {
+ incomingMessage = new Message<>(messageHead, contentDecoder);
+ break;
+ } else {
+ inputEnd();
+ }
+ }
+ } while (bytesRead > 0);
+
+ if (bytesRead == -1 && !inbuf.hasData()) {
+ if (incomingMessage == null && outgoingMessage == null) {
+ requestShutdown(ShutdownType.IMMEDIATE);
+ } else {
+ doTerminate(new ConnectionClosedException("Connection closed by peer"));
+ }
+ return;
+ }
+ }
+
+ if (incomingMessage != null) {
+ final ContentDecoder contentDecoder = incomingMessage.getBody();
+ consumeData(contentDecoder);
+ if (contentDecoder.isCompleted()) {
+ incomingMessage = null;
+ inputEnd();
+ }
+ }
+ } while (connState.compareTo(ConnectionState.SHUTDOWN) < 0 && inbuf.hasData());
+ }
+
+ public final void onOutput() throws IOException, HttpException {
+ outputLock.lock();
+ try {
+ if (outbuf.hasData()) {
+ final int bytesWritten = outbuf.flush(ioSession.channel());
+ if (bytesWritten > 0) {
+ outTransportMetrics.incrementBytesTransferred(bytesWritten);
+ }
+ }
+ } finally {
+ outputLock.unlock();
+ }
+ if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) < 0) {
+ if (isOutputReady()) {
+ produceOutput();
+ } else {
+ outputLock.lock();
+ try {
+ if (!outbuf.hasData()) {
+ ioSession.clearEvent(SelectionKey.OP_WRITE);
+ }
+ } finally {
+ outputLock.unlock();
+ }
+ }
+ outputLock.lock();
+ final boolean outputEnd;
+ try {
+ outputEnd = outgoingMessage == null && !outbuf.hasData();
+ } finally {
+ outputLock.unlock();
+ }
+ if (outputEnd) {
+ outputEnd();
+ processCommands();
+ }
+ }
+ outputLock.lock();
+ try {
+ if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
+ connState = ConnectionState.SHUTDOWN;
+ }
+ if (!outbuf.hasData() && connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
+ ioSession.close();
+ cancelPendingCommands();
+ releaseResources();
+ }
+ } finally {
+ outputLock.unlock();
+ }
+ }
+
+ public final void onTimeout() throws IOException, HttpException {
+ if (!handleTimeout()) {
+ doTerminate(new SocketTimeoutException());
+ }
+ }
+
+ public final void onException(final Exception ex) {
+ doTerminate(ex);
+ if (connectionListener != null) {
+ connectionListener.onError(this, ex);
+ }
+ }
+
+ public final void onDisconnect() {
+ cancelPendingCommands();
+ releaseResources();
+ if (connectionListener != null) {
+ connectionListener.onDisconnect(this);
+ }
+ }
+
+ private void cancelPendingCommands() {
+ final Deque<Command> commandQueue = ioSession.getCommandQueue();
+ for (;;) {
+ final Command command = commandQueue.poll();
+ if (command != null) {
+ command.cancel();
+ } else {
+ break;
+ }
+ }
+ }
+
+ void requestShutdown(final ShutdownType shutdownType) {
+ switch (shutdownType) {
+ case GRACEFUL:
+ if (connState == ConnectionState.ACTIVE) {
+ connState = ConnectionState.GRACEFUL_SHUTDOWN;
+ }
+ break;
+ case IMMEDIATE:
+ connState = ConnectionState.SHUTDOWN;
+ break;
+ }
+ ioSession.setEvent(SelectionKey.OP_WRITE);
+ }
+
+ void commitMessageHead(final OutgoingMessage messageHead, final boolean endStream) throws HttpException, IOException {
+ outputLock.lock();
+ try {
+ outgoingMessageWriter.write(messageHead, outbuf);
+ updateOutputMetrics(messageHead, connMetrics);
+ if (!endStream) {
+ final ContentEncoder contentEncoder = handleOutgoingMessage(messageHead, ioSession.channel(), outbuf, outTransportMetrics);
+ if (contentEncoder != null) {
+ outgoingMessage = new Message<>(messageHead, contentEncoder);
+ }
+ }
+ outgoingMessageWriter.reset();
+ ioSession.setEvent(EventMask.WRITE);
+ } finally {
+ outputLock.unlock();
+ }
+ }
+
+ void requestSessionInput() {
+ ioSession.setEvent(SelectionKey.OP_READ);
+ }
+
+ void suspendSessionInput() {
+ ioSession.clearEvent(SelectionKey.OP_READ);
+ }
+
+ void requestSessionOutput() {
+ ioSession.setEvent(SelectionKey.OP_WRITE);
+ }
+
+ void suspendSessionOutput() {
+ ioSession.clearEvent(SelectionKey.OP_WRITE);
+ }
+
+ int streamOutput(final ByteBuffer src) throws IOException {
+ outputLock.lock();
+ try {
+ if (outgoingMessage == null) {
+ throw new ClosedChannelException();
+ }
+ final ContentEncoder contentEncoder = outgoingMessage.getBody();
+ final int bytesWritten = contentEncoder.write(src);
+ if (bytesWritten > 0) {
+ ioSession.setEvent(SelectionKey.OP_WRITE);
+ }
+ return bytesWritten;
+ } finally {
+ outputLock.unlock();
+ }
+ }
+
+ enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
+
+ MessageDelineation endOutputStream() throws IOException {
+ outputLock.lock();
+ try {
+ if (outgoingMessage == null) {
+ return MessageDelineation.NONE;
+ }
+ final ContentEncoder contentEncoder = outgoingMessage.getBody();
+ contentEncoder.complete();
+ ioSession.setEvent(SelectionKey.OP_WRITE);
+ outgoingMessage = null;
+ if (contentEncoder instanceof ChunkEncoder) {
+ return MessageDelineation.CHUNK_CODED;
+ } else {
+ return MessageDelineation.MESSAGE_HEAD;
+ }
+ } finally {
+ outputLock.unlock();
+ }
+ }
+
+ boolean isOutputCompleted() {
+ outputLock.lock();
+ try {
+ if (outgoingMessage == null) {
+ return true;
+ }
+ final ContentEncoder contentEncoder = outgoingMessage.getBody();
+ return contentEncoder.isCompleted();
+ } finally {
+ outputLock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ ioSession.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+ ioSession.setEvent(SelectionKey.OP_WRITE);
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ ioSession.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.IMMEDIATE));
+ ioSession.setEvent(SelectionKey.OP_WRITE);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return connState == ConnectionState.ACTIVE;
+ }
+
+ @Override
+ public void setSocketTimeout(final int timeout) {
+ ioSession.setSocketTimeout(timeout);
+ }
+
+ @Override
+ public HttpConnectionMetrics getMetrics() {
+ return connMetrics;
+ }
+
+ @Override
+ public int getSocketTimeout() {
+ return ioSession.getSocketTimeout();
+ }
+
+ @Override
+ public ProtocolVersion getProtocolVersion() {
+ return version;
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return ioSession.getRemoteAddress();
+ }
+
+ @Override
+ public SocketAddress getLocalAddress() {
+ return ioSession.getLocalAddress();
+ }
+
+ @Override
+ public String toString() {
+ final SocketAddress remoteAddress = ioSession.getRemoteAddress();
+ final SocketAddress localAddress = ioSession.getLocalAddress();
+ final StringBuilder buffer = new StringBuilder();
+ NetUtils.formatAddress(buffer, localAddress);
+ buffer.append("->");
+ NetUtils.formatAddress(buffer, remoteAddress);
+ return buffer.toString();
+ }
+
+}
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageParser.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageParser.java?rev=1767339&r1=1767338&r2=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageParser.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageParser.java Mon Oct 31 17:33:27 2016
@@ -34,11 +34,11 @@ import java.util.List;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpMessage;
import org.apache.hc.core5.http.MessageConstraintException;
-import org.apache.hc.core5.http.config.MessageConstraints;
+import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.message.LazyLineParser;
import org.apache.hc.core5.http.message.LineParser;
-import org.apache.hc.core5.http.nio.NHttpMessageParser;
import org.apache.hc.core5.http.nio.SessionInputBuffer;
+import org.apache.hc.core5.http.nio.NHttpMessageParser;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.CharArrayBuffer;
@@ -62,7 +62,7 @@ public abstract class AbstractMessagePar
private int emptyLineCount;
private final LineParser lineParser;
- private final MessageConstraints messageConstraints;
+ private final H1Config messageConstraints;
/**
* Creates an instance of AbstractMessageParser.
@@ -70,14 +70,14 @@ public abstract class AbstractMessagePar
* @param lineParser the line parser. If {@code null}
* {@link org.apache.hc.core5.http.message.LazyLineParser#INSTANCE} will be used.
* @param messageConstraints Message constraints. If {@code null}
- * {@link MessageConstraints#DEFAULT} will be used.
+ * {@link H1Config#DEFAULT} will be used.
*
* @since 4.3
*/
- public AbstractMessageParser(final LineParser lineParser, final MessageConstraints messageConstraints) {
+ public AbstractMessageParser(final LineParser lineParser, final H1Config messageConstraints) {
super();
this.lineParser = lineParser != null ? lineParser : LazyLineParser.INSTANCE;
- this.messageConstraints = messageConstraints != null ? messageConstraints : MessageConstraints.DEFAULT;
+ this.messageConstraints = messageConstraints != null ? messageConstraints : H1Config.DEFAULT;
this.headerBufs = new ArrayList<>();
this.state = READ_HEAD_LINE;
}
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageWriter.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageWriter.java?rev=1767339&r1=1767338&r2=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageWriter.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageWriter.java Mon Oct 31 17:33:27 2016
@@ -36,8 +36,8 @@ import org.apache.hc.core5.http.HttpExce
import org.apache.hc.core5.http.HttpMessage;
import org.apache.hc.core5.http.message.BasicLineFormatter;
import org.apache.hc.core5.http.message.LineFormatter;
-import org.apache.hc.core5.http.nio.NHttpMessageWriter;
import org.apache.hc.core5.http.nio.SessionOutputBuffer;
+import org.apache.hc.core5.http.nio.NHttpMessageWriter;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.CharArrayBuffer;
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkDecoder.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkDecoder.java?rev=1767339&r1=1767338&r2=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkDecoder.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkDecoder.java Mon Oct 31 17:33:27 2016
@@ -39,7 +39,7 @@ import org.apache.hc.core5.http.Malforme
import org.apache.hc.core5.http.MessageConstraintException;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.TruncatedChunkException;
-import org.apache.hc.core5.http.config.MessageConstraints;
+import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
import org.apache.hc.core5.http.message.BufferedHeader;
import org.apache.hc.core5.http.nio.SessionInputBuffer;
@@ -66,7 +66,7 @@ public class ChunkDecoder extends Abstra
private long chunkSize;
private long pos;
- private final MessageConstraints constraints;
+ private final H1Config constraints;
private final List<CharArrayBuffer> trailerBufs;
private Header[] footers;
@@ -77,7 +77,7 @@ public class ChunkDecoder extends Abstra
public ChunkDecoder(
final ReadableByteChannel channel,
final SessionInputBuffer buffer,
- final MessageConstraints constraints,
+ final H1Config constraints,
final BasicHttpTransportMetrics metrics) {
super(channel, buffer, metrics);
this.state = READ_CONTENT;
@@ -85,7 +85,7 @@ public class ChunkDecoder extends Abstra
this.pos = 0L;
this.endOfChunk = false;
this.endOfStream = false;
- this.constraints = constraints != null ? constraints : MessageConstraints.DEFAULT;
+ this.constraints = constraints != null ? constraints : H1Config.DEFAULT;
this.trailerBufs = new ArrayList<>();
}
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkEncoder.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkEncoder.java?rev=1767339&r1=1767338&r2=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkEncoder.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkEncoder.java Mon Oct 31 17:33:27 2016
@@ -90,7 +90,7 @@ public class ChunkEncoder extends Abstra
while (src.hasRemaining()) {
int chunk = src.remaining();
int avail;
- avail = this.buffer.available();
+ avail = this.buffer.capacity();
// subtract the length of the longest chunk header
// 12345678\r\n
Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandler.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/CapacityChannel.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandler.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandler.java&p1=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/CapacityChannel.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/CapacityChannel.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandler.java Mon Oct 31 17:33:27 2016
@@ -24,23 +24,23 @@
* <http://www.apache.org/>.
*
*/
-package org.apache.hc.core5.http2.nio;
-import java.io.IOException;
+package org.apache.hc.core5.http.impl.nio;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
/**
- * Abstract capacity update channel.
- * <p>
- * Implementations are expected to be thread-safe.
+ * {@link org.apache.hc.core5.reactor.IOEventHandler} that implements client side HTTP/1.1 messaging protocol.
*
* @since 5.0
*/
-@Contract(threading = ThreadingBehavior.SAFE)
-public interface CapacityChannel {
+@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
+public class ClientHttp1IOEventHandler extends AbstractHttp1IOEventHandler {
- void update(int increment) throws IOException;
+ public ClientHttp1IOEventHandler(final ClientHttp1StreamDuplexer streamDuplexer) {
+ super(streamDuplexer);
+ }
}
+
Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,136 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.nio;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.ConnectionReuseStrategy;
+import org.apache.hc.core5.http.ContentLengthStrategy;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.config.ConnectionConfig;
+import org.apache.hc.core5.http.config.H1Config;
+import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
+import org.apache.hc.core5.http.nio.NHttpMessageParserFactory;
+import org.apache.hc.core5.http.nio.NHttpMessageWriterFactory;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.reactor.IOEventHandler;
+import org.apache.hc.core5.reactor.IOEventHandlerFactory;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
+public class ClientHttp1IOEventHandlerFactory implements IOEventHandlerFactory {
+
+ private final HttpProcessor httpProcessor;
+ private final ConnectionConfig connectionConfig;
+ private final ConnectionReuseStrategy connectionReuseStrategy;
+ private final NHttpMessageParserFactory<HttpResponse> responseParserFactory;
+ private final NHttpMessageWriterFactory<HttpRequest> requestWriterFactory;
+ private final ContentLengthStrategy incomingContentStrategy;
+ private final ContentLengthStrategy outgoingContentStrategy;
+ private final ConnectionListener connectionListener;
+ private final Http1StreamListener streamListener;
+
+ public ClientHttp1IOEventHandlerFactory(
+ final HttpProcessor httpProcessor,
+ final ConnectionConfig connectionConfig,
+ final ConnectionReuseStrategy connectionReuseStrategy,
+ final NHttpMessageParserFactory<HttpResponse> responseParserFactory,
+ final NHttpMessageWriterFactory<HttpRequest> requestWriterFactory,
+ final ContentLengthStrategy incomingContentStrategy,
+ final ContentLengthStrategy outgoingContentStrategy,
+ final ConnectionListener connectionListener,
+ final Http1StreamListener streamListener) {
+ this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
+ this.connectionConfig = connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
+ this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
+ DefaultConnectionReuseStrategy.INSTANCE;
+ this.responseParserFactory = responseParserFactory != null ? responseParserFactory :
+ DefaultHttpResponseParserFactory.INSTANCE;
+ this.requestWriterFactory = requestWriterFactory != null ? requestWriterFactory :
+ DefaultHttpRequestWriterFactory.INSTANCE;
+ this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
+ DefaultContentLengthStrategy.INSTANCE;
+ this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
+ DefaultContentLengthStrategy.INSTANCE;
+ this.connectionListener = connectionListener;
+ this.streamListener = streamListener;
+ }
+
+ public ClientHttp1IOEventHandlerFactory(
+ final HttpProcessor httpProcessor,
+ final ConnectionConfig connectionConfig,
+ final ConnectionReuseStrategy connectionReuseStrategy,
+ final NHttpMessageParserFactory<HttpResponse> responseParserFactory,
+ final NHttpMessageWriterFactory<HttpRequest> requestWriterFactory,
+ final ConnectionListener connectionListener,
+ final Http1StreamListener streamListener) {
+ this(httpProcessor, connectionConfig, connectionReuseStrategy,
+ responseParserFactory, requestWriterFactory, null ,null, connectionListener, streamListener);
+ }
+
+ public ClientHttp1IOEventHandlerFactory(
+ final HttpProcessor httpProcessor,
+ final ConnectionConfig connectionConfig,
+ final ConnectionListener connectionListener,
+ final Http1StreamListener streamListener) {
+ this(httpProcessor, connectionConfig, null, null, null, connectionListener, streamListener);
+ }
+
+ public ClientHttp1IOEventHandlerFactory(
+ final HttpProcessor httpProcessor,
+ final ConnectionConfig connectionConfig) {
+ this(httpProcessor, connectionConfig, null, null);
+ }
+
+ @Override
+ public IOEventHandler createHandler(final IOSession ioSession) {
+ return new ClientHttp1IOEventHandler(createStreamDuplexer(ioSession));
+ }
+
+ protected ClientHttp1StreamDuplexer createStreamDuplexer(final IOSession ioSession) {
+ return new ClientHttp1StreamDuplexer(
+ ioSession,
+ httpProcessor,
+ H1Config.DEFAULT,
+ connectionConfig,
+ connectionReuseStrategy,
+ responseParserFactory.create(H1Config.DEFAULT),
+ requestWriterFactory.create(),
+ incomingContentStrategy,
+ outgoingContentStrategy,
+ connectionListener,
+ streamListener);
+ }
+
+}
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,371 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hc.core5.http.ConnectionClosedException;
+import org.apache.hc.core5.http.ConnectionReuseStrategy;
+import org.apache.hc.core5.http.ContentLengthStrategy;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.LengthRequiredException;
+import org.apache.hc.core5.http.config.ConnectionConfig;
+import org.apache.hc.core5.http.config.H1Config;
+import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
+import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
+import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
+import org.apache.hc.core5.http.nio.ContentDecoder;
+import org.apache.hc.core5.http.nio.ContentEncoder;
+import org.apache.hc.core5.http.nio.NHttpMessageParser;
+import org.apache.hc.core5.http.nio.NHttpMessageWriter;
+import org.apache.hc.core5.http.nio.SessionInputBuffer;
+import org.apache.hc.core5.http.nio.SessionOutputBuffer;
+import org.apache.hc.core5.http.nio.command.ExecutionCommand;
+import org.apache.hc.core5.http.nio.command.ShutdownType;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Asserts;
+
+public class ClientHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpResponse, HttpRequest> {
+
+ private final HttpProcessor httpProcessor;
+ private final ConnectionReuseStrategy connectionReuseStrategy;
+ private final int fragmentSizeHint;
+ private final H1Config h1Config;
+ private final ContentLengthStrategy incomingContentStrategy;
+ private final ContentLengthStrategy outgoingContentStrategy;
+ private final Http1StreamListener streamListener;
+ private final ByteBuffer contentBuffer;
+ private final Queue<ClientHttp1StreamHandler> pipeline;
+ private final Http1StreamChannel<HttpRequest> outputChannel;
+
+ private volatile boolean inconsistent;
+ private volatile ClientHttp1StreamHandler outgoing;
+ private volatile ClientHttp1StreamHandler incoming;
+
+ public ClientHttp1StreamDuplexer(
+ final IOSession ioSession,
+ final HttpProcessor httpProcessor,
+ final H1Config h1Config,
+ final ConnectionConfig connectionConfig,
+ final ConnectionReuseStrategy connectionReuseStrategy,
+ final NHttpMessageParser<HttpResponse> incomingMessageParser,
+ final NHttpMessageWriter<HttpRequest> outgoingMessageWriter,
+ final ContentLengthStrategy incomingContentStrategy,
+ final ContentLengthStrategy outgoingContentStrategy,
+ final ConnectionListener connectionListener,
+ final Http1StreamListener streamListener) {
+ super(ioSession, connectionConfig, incomingMessageParser, outgoingMessageWriter, connectionListener);
+ this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
+ final int bufferSize = connectionConfig.getBufferSize();
+ final int fragmentSizeHint = connectionConfig.getFragmentSizeHint();
+ this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : bufferSize;
+ this.h1Config = h1Config != null ? h1Config : H1Config.DEFAULT;
+ this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
+ DefaultConnectionReuseStrategy.INSTANCE;
+ this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
+ DefaultContentLengthStrategy.INSTANCE;
+ this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
+ DefaultContentLengthStrategy.INSTANCE;
+ this.streamListener = streamListener;
+ this.contentBuffer = ByteBuffer.allocate(connectionConfig.getBufferSize());
+ this.pipeline = new ConcurrentLinkedQueue<>();
+ this.outputChannel = new Http1StreamChannel<HttpRequest>() {
+
+ @Override
+ public void submit(final HttpRequest request, final boolean endStream) throws HttpException, IOException {
+ if (streamListener != null) {
+ streamListener.onRequestHead(ClientHttp1StreamDuplexer.this, request);
+ }
+ commitMessageHead(request, endStream);
+ }
+
+ @Override
+ public void update(final int increment) throws IOException {
+ if (increment > 0) {
+ requestSessionInput();
+ }
+ }
+
+ @Override
+ public void suspendInput() {
+ suspendSessionInput();
+ }
+
+ @Override
+ public void requestInput() {
+ requestSessionInput();
+ }
+
+ @Override
+ public void suspendOutput() {
+ suspendSessionOutput();
+ }
+
+ @Override
+ public void requestOutput() {
+ requestSessionOutput();
+ }
+
+ @Override
+ public int write(final ByteBuffer src) throws IOException {
+ return streamOutput(src);
+ }
+
+ @Override
+ public void complete() throws IOException {
+ endOutputStream();
+ }
+
+ @Override
+ public boolean isCompleted() {
+ return isOutputCompleted();
+ }
+
+ @Override
+ public void abortOutput() throws IOException {
+ final MessageDelineation messageDelineation = endOutputStream();
+ if (messageDelineation == MessageDelineation.MESSAGE_HEAD) {
+ inconsistent = true;
+ requestShutdown(ShutdownType.GRACEFUL);
+ }
+ }
+
+ @Override
+ public void activate() throws HttpException, IOException {
+ }
+
+ };
+ }
+
+ @Override
+ public void releaseResources() {
+ if (incoming != null) {
+ incoming.releaseResources();
+ incoming = null;
+ }
+ if (outgoing != null) {
+ outgoing.releaseResources();
+ outgoing = null;
+ }
+ for (;;) {
+ final ClientHttp1StreamHandler handler = pipeline.poll();
+ if (handler != null) {
+ handler.releaseResources();
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Override
+ void terminate(final Exception exception) {
+ if (incoming != null) {
+ incoming.failed(exception);
+ incoming = null;
+ }
+ if (outgoing != null) {
+ outgoing.failed(exception);
+ outgoing = null;
+ }
+ for (;;) {
+ final ClientHttp1StreamHandler handler = pipeline.poll();
+ if (handler != null) {
+ handler.failed(exception);
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Override
+ void updateInputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
+ if (response.getCode() >= 200) {
+ connMetrics.incrementRequestCount();
+ }
+ }
+
+ @Override
+ void updateOutputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
+ connMetrics.incrementRequestCount();
+ }
+
+ @Override
+ protected ContentDecoder handleIncomingMessage(
+ final HttpResponse response,
+ final ReadableByteChannel channel,
+ final SessionInputBuffer buffer,
+ final BasicHttpTransportMetrics metrics) throws HttpException {
+
+ if (incoming == null) {
+ incoming = pipeline.poll();
+ }
+ if (incoming == null) {
+ throw new HttpException("Unexpected response");
+ }
+
+ if (incoming.isHeadRequest()) {
+ return null;
+ }
+ final int status = response.getCode();
+ if (status < HttpStatus.SC_SUCCESS || status == HttpStatus.SC_NO_CONTENT || status == HttpStatus.SC_NOT_MODIFIED) {
+ return null;
+ }
+ final long len = incomingContentStrategy.determineLength(response);
+ if (len >= 0) {
+ return new LengthDelimitedDecoder(channel, buffer, metrics, len);
+ } else if (len == ContentLengthStrategy.CHUNKED) {
+ return new ChunkDecoder(channel, buffer, h1Config, metrics);
+ } else {
+ return new IdentityDecoder(channel, buffer, metrics);
+ }
+ }
+
+ @Override
+ protected ContentEncoder handleOutgoingMessage(
+ final HttpRequest request,
+ final WritableByteChannel channel,
+ final SessionOutputBuffer buffer,
+ final BasicHttpTransportMetrics metrics) throws HttpException {
+ final long len = outgoingContentStrategy.determineLength(request);
+ if (len >= 0) {
+ return new LengthDelimitedEncoder(channel, buffer, metrics, len, fragmentSizeHint);
+ } else if (len == ContentLengthStrategy.CHUNKED) {
+ return new ChunkEncoder(channel, buffer, metrics, fragmentSizeHint, null);
+ } else {
+ throw new LengthRequiredException("Length required");
+ }
+ }
+
+ @Override
+ boolean inputIdle() {
+ return incoming == null;
+ }
+
+ @Override
+ boolean outputIdle() {
+ return outgoing == null && pipeline.isEmpty();
+ }
+
+ @Override
+ void outputEnd() throws HttpException, IOException {
+ if (outgoing != null) {
+ if (outgoing.isCompleted()) {
+ outgoing.releaseResources();
+ }
+ outgoing = null;
+ }
+ }
+
+ @Override
+ void execute(final ExecutionCommand executionCommand) throws HttpException, IOException {
+ final ClientHttp1StreamHandler handler = new ClientHttp1StreamHandler(
+ this,
+ outputChannel,
+ httpProcessor,
+ h1Config,
+ connectionReuseStrategy,
+ executionCommand.getExchangeHandler(),
+ executionCommand.getContext(),
+ contentBuffer);
+ if (handler.isOutputReady()) {
+ handler.produceOutput();
+ }
+ pipeline.add(handler);
+ outgoing = handler;
+ }
+
+ @Override
+ boolean isOutputReady() {
+ return outgoing != null && outgoing.isOutputReady();
+ }
+
+ @Override
+ void produceOutput() throws HttpException, IOException {
+ if (outgoing != null) {
+ outgoing.produceOutput();
+ }
+ }
+
+ @Override
+ void consumeHeader(final HttpResponse response, final boolean endStream) throws HttpException, IOException {
+ if (streamListener != null) {
+ streamListener.onResponseHead(this, response);
+ }
+ Asserts.notNull(incoming, "Response stream handler");
+ incoming.consumeHeader(response, endStream);
+ }
+
+ @Override
+ void consumeData(final ContentDecoder contentDecoder) throws HttpException, IOException {
+ Asserts.notNull(incoming, "Response stream handler");
+ incoming.consumeData(contentDecoder);
+ }
+
+ @Override
+ void inputEnd() throws HttpException, IOException {
+ Asserts.notNull(incoming, "Response stream handler");
+ if (incoming.isResponseCompleted()) {
+ final boolean keepAlive = !inconsistent && incoming.keepAlive();
+ if (incoming.isCompleted()) {
+ incoming.releaseResources();
+ }
+ incoming = null;
+ if (streamListener != null) {
+ streamListener.onExchangeComplete(this, keepAlive);
+ }
+ if (!keepAlive) {
+ if (outgoing != null && outgoing.isCompleted()) {
+ outgoing.releaseResources();
+ outgoing = null;
+ }
+ if (outgoing == null && pipeline.isEmpty()) {
+ requestShutdown(ShutdownType.IMMEDIATE);
+ } else {
+ doTerminate(new ConnectionClosedException("Connection cannot be kept alive"));
+ }
+ }
+ }
+ }
+
+ @Override
+ boolean handleTimeout() {
+ return outgoing != null && outgoing.handleTimeout();
+ }
+
+}
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,313 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.impl.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hc.core5.http.ConnectionReuseStrategy;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HeaderElements;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.ProtocolException;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.config.H1Config;
+import org.apache.hc.core5.http.impl.LazyEntityDetails;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.ContentDecoder;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.nio.ResourceHolder;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
+
+class ClientHttp1StreamHandler implements ResourceHolder {
+
+ private final HttpConnection connection;
+ private final Http1StreamChannel<HttpRequest> outputChannel;
+ private final DataStreamChannel internalDataChannel;
+ private final HttpProcessor httpProcessor;
+ private final H1Config h1Config;
+ private final ConnectionReuseStrategy connectionReuseStrategy;
+ private final AsyncClientExchangeHandler exchangeHandler;
+ private final HttpCoreContext context;
+ private final ByteBuffer inputBuffer;
+ private final AtomicBoolean requestCommitted;
+ private final AtomicBoolean done;
+
+ private volatile int timeout;
+ private volatile HttpRequest committedRequest;
+ private volatile HttpResponse receivedResponse;
+ private volatile MessageState requestState;
+ private volatile MessageState responseState;
+
+ ClientHttp1StreamHandler(
+ final HttpConnection connection,
+ final Http1StreamChannel<HttpRequest> outputChannel,
+ final HttpProcessor httpProcessor,
+ final H1Config h1Config,
+ final ConnectionReuseStrategy connectionReuseStrategy,
+ final AsyncClientExchangeHandler exchangeHandler,
+ final HttpContext context,
+ final ByteBuffer inputBuffer) {
+ this.connection = connection;
+ this.outputChannel = outputChannel;
+ this.internalDataChannel = new DataStreamChannel() {
+
+ @Override
+ public void requestOutput() {
+ outputChannel.requestOutput();
+ }
+
+ @Override
+ public void endStream(final List<Header> trailers) throws IOException {
+ outputChannel.complete();
+ requestState = MessageState.COMPLETE;
+ }
+
+ @Override
+ public int write(final ByteBuffer src) throws IOException {
+ return outputChannel.write(src);
+ }
+
+ @Override
+ public void endStream() throws IOException {
+ endStream(null);
+ }
+
+ };
+
+ this.httpProcessor = httpProcessor;
+ this.h1Config = h1Config;
+ this.connectionReuseStrategy = connectionReuseStrategy;
+ this.exchangeHandler = exchangeHandler;
+ this.context = context != null ? HttpCoreContext.adapt(context) : HttpCoreContext.create();
+ this.inputBuffer = inputBuffer;
+ this.requestCommitted = new AtomicBoolean(false);
+ this.done = new AtomicBoolean(false);
+ this.requestState = MessageState.HEADERS;
+ this.responseState = MessageState.HEADERS;
+ }
+
+ boolean isResponseCompleted() {
+ return responseState == MessageState.COMPLETE;
+ }
+
+ boolean isCompleted() {
+ return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
+ }
+
+ boolean keepAlive() {
+ return committedRequest != null && receivedResponse != null &&
+ connectionReuseStrategy.keepAlive(committedRequest, receivedResponse, context);
+ }
+
+ boolean isHeadRequest() {
+ return committedRequest != null && "HEAD".equalsIgnoreCase(committedRequest.getMethod());
+ }
+
+ boolean isOutputReady() {
+ switch (requestState) {
+ case HEADERS:
+ case ACK:
+ return true;
+ case BODY:
+ return exchangeHandler.available() > 0;
+ default:
+ return false;
+ }
+ }
+
+ private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws IOException, HttpException {
+ if (requestCommitted.compareAndSet(false, true)) {
+ final ProtocolVersion transportVersion = request.getVersion();
+ if (transportVersion != null) {
+ context.setProtocolVersion(transportVersion);
+ }
+ context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
+ context.setAttribute(HttpCoreContext.HTTP_CONNECTION, connection);
+ httpProcessor.process(request, entityDetails, context);
+
+ final boolean endStream = entityDetails == null;
+ outputChannel.submit(request, endStream);
+ committedRequest = request;
+ if (endStream) {
+ requestState = MessageState.COMPLETE;
+ } else {
+ final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
+ final boolean expectContinue = h != null && "100-continue".equalsIgnoreCase(h.getValue());
+ if (expectContinue) {
+ requestState = MessageState.ACK;
+ timeout = connection.getSocketTimeout();
+ connection.setSocketTimeout(h1Config.getWaitForContinueTimeout());
+ } else {
+ requestState = MessageState.BODY;
+ }
+ }
+ } else {
+ throw new HttpException("Request already committed");
+ }
+ }
+
+ void produceOutput() throws HttpException, IOException {
+ switch (requestState) {
+ case HEADERS:
+ exchangeHandler.produceRequest(new RequestChannel() {
+
+ @Override
+ public void sendRequest(
+ final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
+ commitRequest(request, entityDetails);
+ }
+
+ });
+ break;
+ case ACK:
+ outputChannel.suspendOutput();
+ break;
+ case BODY:
+ exchangeHandler.produce(internalDataChannel);
+ break;
+ }
+ }
+
+ private void validateStatus(final HttpResponse response) throws ProtocolException {
+ if (response.getCode() < HttpStatus.SC_INFORMATIONAL) {
+ throw new ProtocolException("Invalid response code");
+ }
+ if (response.getCode() < HttpStatus.SC_SUCCESS) {
+ if (response.getCode() != HttpStatus.SC_CONTINUE) {
+ throw new ProtocolException("Unsupported intermediate response code");
+ }
+ }
+ }
+
+ void consumeHeader(final HttpResponse response, final boolean endStream) throws HttpException, IOException {
+ if (done.get() || responseState != MessageState.HEADERS) {
+ throw new ProtocolException("Unexpected message head");
+ }
+
+ validateStatus(response);
+
+ if (requestState == MessageState.ACK) {
+ connection.setSocketTimeout(timeout);
+ requestState = MessageState.BODY;
+ if (response.getCode() < HttpStatus.SC_CLIENT_ERROR) {
+ exchangeHandler.produce(internalDataChannel);
+ }
+ }
+ if (response.getCode() == HttpStatus.SC_CONTINUE) {
+ return;
+ }
+ if (requestState == MessageState.BODY) {
+ boolean keepAlive = response.getCode() < HttpStatus.SC_CLIENT_ERROR;
+ if (keepAlive) {
+ final Header h = response.getFirstHeader(HttpHeaders.CONNECTION);
+ if (h != null && HeaderElements.CLOSE.equalsIgnoreCase(h.getValue())) {
+ keepAlive = false;
+ }
+ }
+ if (!keepAlive) {
+ requestState = MessageState.COMPLETE;
+ outputChannel.abortOutput();
+ }
+ }
+
+ final EntityDetails entityDetails = endStream ? null : new LazyEntityDetails(response);
+ context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
+ httpProcessor.process(response, entityDetails, context);
+ receivedResponse = response;
+
+ exchangeHandler.consumeResponse(response, entityDetails);
+ responseState = endStream ? MessageState.COMPLETE : MessageState.BODY;
+ }
+
+ void consumeData(final ContentDecoder contentDecoder) throws HttpException, IOException {
+ if (done.get() || responseState != MessageState.BODY) {
+ throw new ProtocolException("Unexpected message data");
+ }
+ while (contentDecoder.read(inputBuffer) > 0) {
+ inputBuffer.flip();
+ final int capacity = exchangeHandler.consume(inputBuffer);
+ inputBuffer.clear();
+ if (capacity <= 0) {
+ if (!contentDecoder.isCompleted()) {
+ outputChannel.suspendInput();
+ exchangeHandler.updateCapacity(outputChannel);
+ }
+ break;
+ }
+ }
+ if (contentDecoder.isCompleted()) {
+ responseState = MessageState.COMPLETE;
+ exchangeHandler.streamEnd(null);
+ }
+ }
+
+ boolean handleTimeout() {
+ if (requestState == MessageState.ACK) {
+ requestState = MessageState.BODY;
+ connection.setSocketTimeout(timeout);
+ outputChannel.requestOutput();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void failed(final Exception cause) {
+ exchangeHandler.failed(cause);
+ }
+
+ @Override
+ public void releaseResources() {
+ if (done.compareAndSet(false, true)) {
+ responseState = MessageState.COMPLETE;
+ requestState = MessageState.COMPLETE;
+ exchangeHandler.releaseResources();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "[" +
+ "requestState=" + requestState +
+ ", responseState=" + responseState +
+ ']';
+ }
+
+}
+
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Pipelined.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Pipelined.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Pipelined.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java Mon Oct 31 17:33:27 2016
@@ -26,14 +26,19 @@
*/
package org.apache.hc.core5.http.impl.nio;
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Documented
-@Target({ElementType.TYPE})
-@Retention(RetentionPolicy.RUNTIME)
-public @interface Pipelined {
+import org.apache.hc.core5.http.HttpConnection;
+
+/**
+ * Connection event listener.
+ *
+ * @since 5.0
+ */
+public interface ConnectionListener {
+
+ void onConnect(HttpConnection connection);
+
+ void onError(HttpConnection connection, Exception ex);
+
+ void onDisconnect(HttpConnection connection);
+
}
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/DefaultHttpRequestFactory.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/DefaultHttpRequestFactory.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/DefaultHttpRequestFactory.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java Mon Oct 31 17:33:27 2016
@@ -25,23 +25,23 @@
*
*/
-package org.apache.hc.core5.http.impl;
+package org.apache.hc.core5.http.impl.nio;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.ClassicHttpRequest;
+import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpRequestFactory;
import org.apache.hc.core5.http.MethodNotSupportedException;
import org.apache.hc.core5.http.ProtocolVersion;
-import org.apache.hc.core5.http.message.BasicClassicHttpRequest;
+import org.apache.hc.core5.http.message.BasicHttpRequest;
/**
- * Default factory for creating {@link ClassicHttpRequest} objects.
+ * Default factory for creating {@link HttpRequest} objects.
*
* @since 4.0
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE)
-public class DefaultHttpRequestFactory implements HttpRequestFactory<ClassicHttpRequest> {
+public class DefaultHttpRequestFactory implements HttpRequestFactory<HttpRequest> {
public static final DefaultHttpRequestFactory INSTANCE = new DefaultHttpRequestFactory();
@@ -67,9 +67,9 @@ public class DefaultHttpRequestFactory i
}
@Override
- public ClassicHttpRequest newHttpRequest(final ProtocolVersion transportVersion, final String method, final String uri) throws MethodNotSupportedException {
+ public HttpRequest newHttpRequest(final ProtocolVersion transportVersion, final String method, final String uri) throws MethodNotSupportedException {
if (isOneOf(SUPPORTED_METHODS, method)) {
- final ClassicHttpRequest request = new BasicClassicHttpRequest(method, uri);
+ final HttpRequest request = new BasicHttpRequest(method, uri);
request.setVersion(transportVersion);
return request;
}
@@ -77,8 +77,8 @@ public class DefaultHttpRequestFactory i
}
@Override
- public ClassicHttpRequest newHttpRequest(final String method, final String uri) throws MethodNotSupportedException {
- return new BasicClassicHttpRequest(method, uri);
+ public HttpRequest newHttpRequest(final String method, final String uri) throws MethodNotSupportedException {
+ return new BasicHttpRequest(method, uri);
}
}
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain