You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2016/10/31 17:33:33 UTC

svn commit: r1767339 [8/14] - in /httpcomponents/httpcore/trunk: ./ httpcore5-ab/src/main/java/org/apache/hc/core5/http/benchmark/ httpcore5-ab/src/test/java/org/apache/hc/core5/http/benchmark/ httpcore5-h2/src/main/java/org/apache/hc/core5/http2/boots...

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicConnPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicConnPool.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicConnPool.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/io/BasicPoolEntry.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/io/BasicPoolEntry.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/io/BasicPoolEntry.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java Mon Oct 31 17:33:27 2016
@@ -24,7 +24,7 @@
  * <http://www.apache.org/>.
  *
  */
-package org.apache.hc.core5.http.pool.io;
+package org.apache.hc.core5.http.impl.io.pool;
 
 import java.io.IOException;
 

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/BasicPoolEntry.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/io/package-info.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/io/package-info.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/io/package-info.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java Mon Oct 31 17:33:27 2016
@@ -29,4 +29,4 @@
  * Default implementations of client side connection pools
  * for synchronous, blocking communication.
  */
-package org.apache.hc.core5.http.pool.io;
+package org.apache.hc.core5.http.impl.io.pool;

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/io/pool/package-info.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractClassicServerExchangeHandler.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java&p1=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractClassicServerExchangeHandler.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractClassicServerExchangeHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java Mon Oct 31 17:33:27 2016
@@ -24,7 +24,7 @@
  * <http://www.apache.org/>.
  *
  */
-package org.apache.hc.core5.http2.impl.nio;
+package org.apache.hc.core5.http.impl.nio;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -45,17 +45,18 @@ import org.apache.hc.core5.http.HttpRequ
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.impl.nio.entity.SharedInputBuffer;
+import org.apache.hc.core5.http.impl.nio.entity.SharedOutputBuffer;
 import org.apache.hc.core5.http.message.BasicHttpResponse;
 import org.apache.hc.core5.http.message.HttpResponseWrapper;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ExpectationChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
 import org.apache.hc.core5.http.nio.entity.ContentInputStream;
 import org.apache.hc.core5.http.nio.entity.ContentOutputStream;
-import org.apache.hc.core5.http2.nio.AsyncServerExchangeHandler;
-import org.apache.hc.core5.http2.nio.CapacityChannel;
-import org.apache.hc.core5.http2.nio.DataStreamChannel;
-import org.apache.hc.core5.http2.nio.ExpectationChannel;
-import org.apache.hc.core5.http2.nio.ResponseChannel;
-import org.apache.hc.core5.http2.impl.nio.entity.SharedInputBuffer;
-import org.apache.hc.core5.http2.impl.nio.entity.SharedOutputBuffer;
+import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.Asserts;
 
@@ -71,6 +72,7 @@ public abstract class AbstractClassicSer
     private final AtomicReference<State> state;
     private final AtomicReference<Exception> exception;
 
+    private volatile HttpContext context;
     private volatile SharedInputBuffer inputBuffer;
     private volatile SharedOutputBuffer outputBuffer;
 
@@ -85,9 +87,15 @@ public abstract class AbstractClassicSer
         return exception.get();
     }
 
+    @Override
+    public void setContext(final HttpContext context) {
+        this.context = context;
+    }
+
     protected abstract void handle(
             HttpRequest request, InputStream requestStream,
-            HttpResponse response, OutputStream responseStream) throws IOException, HttpException;
+            HttpResponse response, OutputStream responseStream,
+            HttpContext context) throws IOException, HttpException;
 
     @Override
     public final void verify(
@@ -232,7 +240,7 @@ public abstract class AbstractClassicSer
                 @Override
                 public void run() {
                     try {
-                        handle(request, inputStream, responseWrapper, outputStream);
+                        handle(request, inputStream, responseWrapper, outputStream, context);
                         if (inputStream != null) {
                             inputStream.close();
                         }
@@ -253,11 +261,6 @@ public abstract class AbstractClassicSer
     }
 
     @Override
-    public final int capacity() {
-        return inputBuffer != null ? inputBuffer.available() : 0;
-    }
-
-    @Override
     public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
         if (inputBuffer != null) {
             inputBuffer.updateCapacity(capacityChannel);
@@ -265,9 +268,9 @@ public abstract class AbstractClassicSer
     }
 
     @Override
-    public final void consume(final ByteBuffer src) throws IOException {
+    public final int consume(final ByteBuffer src) throws IOException {
         Asserts.notNull(inputBuffer, "Input buffer");
-        inputBuffer.fill(src);
+        return inputBuffer.fill(src);
     }
 
     @Override

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractClassicServerExchangeHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,137 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.nio;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import org.apache.hc.core5.http.HttpConnectionMetrics;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Args;
+
+class AbstractHttp1IOEventHandler implements HttpConnectionEventHandler {
+
+    private final AbstractHttp1StreamDuplexer<?, ?> streamDuplexer;
+
+    AbstractHttp1IOEventHandler(final AbstractHttp1StreamDuplexer<?, ?> streamDuplexer) {
+        this.streamDuplexer = Args.notNull(streamDuplexer, "Stream multiplexer");
+    }
+
+    @Override
+    public void connected(final IOSession session) {
+        try {
+            streamDuplexer.onConnect();
+        } catch (final Exception ex) {
+            streamDuplexer.onException(ex);
+        }
+    }
+
+    @Override
+    public void inputReady(final IOSession session) {
+        try {
+            streamDuplexer.onInput();
+        } catch (final Exception ex) {
+            streamDuplexer.onException(ex);
+        }
+    }
+
+    @Override
+    public void outputReady(final IOSession session) {
+        try {
+            streamDuplexer.onOutput();
+        } catch (final Exception ex) {
+            streamDuplexer.onException(ex);
+        }
+    }
+
+    @Override
+    public void timeout(final IOSession session) {
+        try {
+            streamDuplexer.onTimeout();
+        } catch (final Exception ex) {
+            streamDuplexer.onException(ex);
+        }
+    }
+
+    @Override
+    public void disconnected(final IOSession session) {
+        streamDuplexer.onDisconnect();
+    }
+
+    @Override
+    public void close() throws IOException {
+        streamDuplexer.close();
+    }
+
+    @Override
+    public void shutdown() throws IOException {
+        streamDuplexer.shutdown();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return streamDuplexer.isOpen();
+    }
+
+    @Override
+    public void setSocketTimeout(final int timeout) {
+        streamDuplexer.setSocketTimeout(timeout);
+    }
+
+    @Override
+    public HttpConnectionMetrics getMetrics() {
+        return streamDuplexer.getMetrics();
+    }
+
+    @Override
+    public int getSocketTimeout() {
+        return streamDuplexer.getSocketTimeout();
+    }
+
+    @Override
+    public ProtocolVersion getProtocolVersion() {
+        return streamDuplexer.getProtocolVersion();
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() {
+        return streamDuplexer.getRemoteAddress();
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() {
+        return streamDuplexer.getLocalAddress();
+    }
+
+    @Override
+    public String toString() {
+        return streamDuplexer.toString();
+    }
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,492 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.nio;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.WritableByteChannel;
+import java.util.Deque;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.http.ConnectionClosedException;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpConnectionMetrics;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpMessage;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.config.ConnectionConfig;
+import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
+import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
+import org.apache.hc.core5.http.impl.ConnSupport;
+import org.apache.hc.core5.http.nio.ContentDecoder;
+import org.apache.hc.core5.http.nio.ContentEncoder;
+import org.apache.hc.core5.http.nio.NHttpMessageParser;
+import org.apache.hc.core5.http.nio.NHttpMessageWriter;
+import org.apache.hc.core5.http.nio.ResourceHolder;
+import org.apache.hc.core5.http.nio.SessionInputBuffer;
+import org.apache.hc.core5.http.nio.SessionOutputBuffer;
+import org.apache.hc.core5.http.nio.command.ExecutionCommand;
+import org.apache.hc.core5.http.nio.command.ShutdownCommand;
+import org.apache.hc.core5.http.nio.command.ShutdownType;
+import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.reactor.EventMask;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.HeapByteBufferAllocator;
+import org.apache.hc.core5.util.NetUtils;
+
+abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
+        implements ResourceHolder, HttpConnection {
+
+    private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
+
+    private final IOSession ioSession;
+    private final SessionInputBufferImpl inbuf;
+    private final SessionOutputBufferImpl outbuf;
+    private final BasicHttpTransportMetrics inTransportMetrics;
+    private final BasicHttpTransportMetrics outTransportMetrics;
+    private final BasicHttpConnectionMetrics connMetrics;
+    private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
+    private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
+    private final ConnectionListener connectionListener;
+    private final Lock outputLock;
+
+    private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
+    private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
+    private volatile ConnectionState connState = ConnectionState.READY;
+
+    private volatile ProtocolVersion version;
+
+    AbstractHttp1StreamDuplexer(
+            final IOSession ioSession,
+            final ConnectionConfig connectionConfig,
+            final NHttpMessageParser<IncomingMessage> incomingMessageParser,
+            final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
+            final ConnectionListener connectionListener) {
+        this.ioSession = Args.notNull(ioSession, "I/O session");
+        final int bufferSize = connectionConfig.getBufferSize();
+        this.inbuf = new SessionInputBufferImpl(bufferSize,
+                bufferSize < 512 ? bufferSize : 512,
+                ConnSupport.createDecoder(connectionConfig),
+                HeapByteBufferAllocator.INSTANCE);
+        this.outbuf = new SessionOutputBufferImpl(bufferSize,
+                bufferSize < 512 ? bufferSize : 512,
+                ConnSupport.createEncoder(connectionConfig),
+                HeapByteBufferAllocator.INSTANCE);
+        this.inTransportMetrics = new BasicHttpTransportMetrics();
+        this.outTransportMetrics = new BasicHttpTransportMetrics();
+        this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
+        this.incomingMessageParser = incomingMessageParser;
+        this.outgoingMessageWriter = outgoingMessageWriter;
+        this.connectionListener = connectionListener;
+        this.outputLock = new ReentrantLock();
+        this.connState = ConnectionState.READY;
+    }
+
+    void doTerminate(final Exception exception) {
+        connState = ConnectionState.SHUTDOWN;
+        try {
+            terminate(exception);
+        } finally {
+            ioSession.shutdown();
+        }
+    }
+
+    abstract void terminate(final Exception exception);
+
+    abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
+
+    abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
+
+    abstract void consumeHeader(IncomingMessage messageHead, boolean endStream) throws HttpException, IOException;
+
+    abstract ContentDecoder handleIncomingMessage(
+            IncomingMessage incomingMessage,
+            ReadableByteChannel channel,
+            SessionInputBuffer buffer,
+            BasicHttpTransportMetrics metrics) throws HttpException;
+
+    abstract ContentEncoder handleOutgoingMessage(
+            OutgoingMessage outgoingMessage,
+            WritableByteChannel channel,
+            SessionOutputBuffer buffer,
+            BasicHttpTransportMetrics metrics) throws HttpException;
+
+    abstract void consumeData(ContentDecoder contentDecoder) throws HttpException, IOException;
+
+    abstract boolean isOutputReady();
+
+    abstract void produceOutput() throws HttpException, IOException;
+
+    abstract void execute(ExecutionCommand executionCommand) throws HttpException, IOException;
+
+    abstract void inputEnd() throws HttpException, IOException;
+
+    abstract void outputEnd() throws HttpException, IOException;
+
+    abstract boolean inputIdle();
+
+    abstract boolean outputIdle();
+
+    abstract boolean handleTimeout();
+
+    private void processCommands() throws HttpException, IOException {
+        for (;;) {
+            final Command command = ioSession.getCommandQueue().poll();
+            if (command == null) {
+                return;
+            }
+            if (command instanceof ShutdownCommand) {
+                final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
+                requestShutdown(shutdownCommand.getType());
+            } else if (command instanceof ExecutionCommand) {
+                if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
+                    command.cancel();
+                } else {
+                    execute((ExecutionCommand) command);
+                    return;
+                }
+            } else {
+                throw new HttpException("Unexpected command: " + command.getClass());
+            }
+        }
+    }
+
+    public final void onConnect() throws HttpException, IOException {
+        if (connectionListener != null) {
+            connectionListener.onConnect(this);
+        }
+        connState = ConnectionState.ACTIVE;
+        processCommands();
+    }
+
+    public final void onInput() throws HttpException, IOException {
+        do {
+            if (incomingMessage == null) {
+
+                if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle()) {
+                    ioSession.clearEvent(SelectionKey.OP_READ);
+                    return;
+                }
+
+                int bytesRead;
+                do {
+                    bytesRead = inbuf.fill(ioSession.channel());
+                    if (bytesRead > 0) {
+                        inTransportMetrics.incrementBytesTransferred(bytesRead);
+                    }
+                    final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, bytesRead == -1);
+                    if (messageHead != null) {
+                        incomingMessageParser.reset();
+
+                        this.version = messageHead.getVersion();
+
+                        updateInputMetrics(messageHead, connMetrics);
+                        final ContentDecoder contentDecoder = handleIncomingMessage(messageHead, ioSession.channel(), inbuf, inTransportMetrics);
+                        consumeHeader(messageHead, contentDecoder == null);
+                        if (contentDecoder != null) {
+                            incomingMessage = new Message<>(messageHead, contentDecoder);
+                            break;
+                        } else {
+                            inputEnd();
+                        }
+                    }
+                } while (bytesRead > 0);
+
+                if (bytesRead == -1 && !inbuf.hasData()) {
+                    if (incomingMessage == null && outgoingMessage == null) {
+                        requestShutdown(ShutdownType.IMMEDIATE);
+                    } else {
+                        doTerminate(new ConnectionClosedException("Connection closed by peer"));
+                    }
+                    return;
+                }
+            }
+
+            if (incomingMessage != null) {
+                final ContentDecoder contentDecoder = incomingMessage.getBody();
+                consumeData(contentDecoder);
+                if (contentDecoder.isCompleted()) {
+                    incomingMessage = null;
+                    inputEnd();
+                }
+            }
+        } while (connState.compareTo(ConnectionState.SHUTDOWN) < 0 && inbuf.hasData());
+    }
+
+    public final void onOutput() throws IOException, HttpException {
+        outputLock.lock();
+        try {
+            if (outbuf.hasData()) {
+                final int bytesWritten = outbuf.flush(ioSession.channel());
+                if (bytesWritten > 0) {
+                    outTransportMetrics.incrementBytesTransferred(bytesWritten);
+                }
+            }
+        } finally {
+            outputLock.unlock();
+        }
+        if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) < 0) {
+            if (isOutputReady()) {
+                produceOutput();
+            } else {
+                outputLock.lock();
+                try {
+                    if (!outbuf.hasData()) {
+                        ioSession.clearEvent(SelectionKey.OP_WRITE);
+                    }
+                } finally {
+                    outputLock.unlock();
+                }
+            }
+            outputLock.lock();
+            final boolean outputEnd;
+            try {
+                outputEnd = outgoingMessage == null && !outbuf.hasData();
+            } finally {
+                outputLock.unlock();
+            }
+            if (outputEnd) {
+                outputEnd();
+                processCommands();
+            }
+        }
+        outputLock.lock();
+        try {
+            if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
+                connState = ConnectionState.SHUTDOWN;
+            }
+            if (!outbuf.hasData() && connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
+                ioSession.close();
+                cancelPendingCommands();
+                releaseResources();
+            }
+        } finally {
+            outputLock.unlock();
+        }
+    }
+
+    public final void onTimeout() throws IOException, HttpException {
+        if (!handleTimeout()) {
+            doTerminate(new SocketTimeoutException());
+        }
+    }
+
+    public final void onException(final Exception ex) {
+        doTerminate(ex);
+        if (connectionListener != null) {
+            connectionListener.onError(this, ex);
+        }
+    }
+
+    public final void onDisconnect() {
+        cancelPendingCommands();
+        releaseResources();
+        if (connectionListener != null) {
+            connectionListener.onDisconnect(this);
+        }
+    }
+
+    private void cancelPendingCommands() {
+        final Deque<Command> commandQueue = ioSession.getCommandQueue();
+        for (;;) {
+            final Command command = commandQueue.poll();
+            if (command != null) {
+                command.cancel();
+            } else {
+                break;
+            }
+        }
+    }
+
+    void requestShutdown(final ShutdownType shutdownType) {
+        switch (shutdownType) {
+            case GRACEFUL:
+                if (connState == ConnectionState.ACTIVE) {
+                    connState = ConnectionState.GRACEFUL_SHUTDOWN;
+                }
+                break;
+            case IMMEDIATE:
+                connState = ConnectionState.SHUTDOWN;
+                break;
+        }
+        ioSession.setEvent(SelectionKey.OP_WRITE);
+    }
+
+    void commitMessageHead(final OutgoingMessage messageHead, final boolean endStream) throws HttpException, IOException {
+        outputLock.lock();
+        try {
+            outgoingMessageWriter.write(messageHead, outbuf);
+            updateOutputMetrics(messageHead, connMetrics);
+            if (!endStream) {
+                final ContentEncoder contentEncoder = handleOutgoingMessage(messageHead, ioSession.channel(), outbuf, outTransportMetrics);
+                if (contentEncoder != null) {
+                    outgoingMessage = new Message<>(messageHead, contentEncoder);
+                }
+            }
+            outgoingMessageWriter.reset();
+            ioSession.setEvent(EventMask.WRITE);
+        } finally {
+            outputLock.unlock();
+        }
+    }
+
+    void requestSessionInput() {
+        ioSession.setEvent(SelectionKey.OP_READ);
+    }
+
+    void suspendSessionInput() {
+        ioSession.clearEvent(SelectionKey.OP_READ);
+    }
+
+    void requestSessionOutput() {
+        ioSession.setEvent(SelectionKey.OP_WRITE);
+    }
+
+    void suspendSessionOutput() {
+        ioSession.clearEvent(SelectionKey.OP_WRITE);
+    }
+
+    int streamOutput(final ByteBuffer src) throws IOException {
+        outputLock.lock();
+        try {
+            if (outgoingMessage == null) {
+                throw new ClosedChannelException();
+            }
+            final ContentEncoder contentEncoder = outgoingMessage.getBody();
+            final int bytesWritten = contentEncoder.write(src);
+            if (bytesWritten > 0) {
+                ioSession.setEvent(SelectionKey.OP_WRITE);
+            }
+            return bytesWritten;
+        } finally {
+            outputLock.unlock();
+        }
+    }
+
+    enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
+
+    MessageDelineation endOutputStream() throws IOException {
+        outputLock.lock();
+        try {
+            if (outgoingMessage == null) {
+                return MessageDelineation.NONE;
+            }
+            final ContentEncoder contentEncoder = outgoingMessage.getBody();
+            contentEncoder.complete();
+            ioSession.setEvent(SelectionKey.OP_WRITE);
+            outgoingMessage = null;
+            if (contentEncoder instanceof ChunkEncoder) {
+                return MessageDelineation.CHUNK_CODED;
+            } else {
+                return MessageDelineation.MESSAGE_HEAD;
+            }
+        } finally {
+            outputLock.unlock();
+        }
+    }
+
+    boolean isOutputCompleted() {
+        outputLock.lock();
+        try {
+            if (outgoingMessage == null) {
+                return true;
+            }
+            final ContentEncoder contentEncoder = outgoingMessage.getBody();
+            return contentEncoder.isCompleted();
+        } finally {
+            outputLock.unlock();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        ioSession.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+        ioSession.setEvent(SelectionKey.OP_WRITE);
+    }
+
+    @Override
+    public void shutdown() throws IOException {
+        ioSession.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.IMMEDIATE));
+        ioSession.setEvent(SelectionKey.OP_WRITE);
+    }
+
+    @Override
+    public boolean isOpen() {
+        return connState == ConnectionState.ACTIVE;
+    }
+
+    @Override
+    public void setSocketTimeout(final int timeout) {
+        ioSession.setSocketTimeout(timeout);
+    }
+
+    @Override
+    public HttpConnectionMetrics getMetrics() {
+        return connMetrics;
+    }
+
+    @Override
+    public int getSocketTimeout() {
+        return ioSession.getSocketTimeout();
+    }
+
+    @Override
+    public ProtocolVersion getProtocolVersion() {
+        return version;
+    }
+
+    @Override
+    public SocketAddress getRemoteAddress() {
+        return ioSession.getRemoteAddress();
+    }
+
+    @Override
+    public SocketAddress getLocalAddress() {
+        return ioSession.getLocalAddress();
+    }
+
+    @Override
+    public String toString() {
+        final SocketAddress remoteAddress = ioSession.getRemoteAddress();
+        final SocketAddress localAddress = ioSession.getLocalAddress();
+        final StringBuilder buffer = new StringBuilder();
+        NetUtils.formatAddress(buffer, localAddress);
+        buffer.append("->");
+        NetUtils.formatAddress(buffer, remoteAddress);
+        return buffer.toString();
+    }
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageParser.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageParser.java?rev=1767339&r1=1767338&r2=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageParser.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageParser.java Mon Oct 31 17:33:27 2016
@@ -34,11 +34,11 @@ import java.util.List;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpMessage;
 import org.apache.hc.core5.http.MessageConstraintException;
-import org.apache.hc.core5.http.config.MessageConstraints;
+import org.apache.hc.core5.http.config.H1Config;
 import org.apache.hc.core5.http.message.LazyLineParser;
 import org.apache.hc.core5.http.message.LineParser;
-import org.apache.hc.core5.http.nio.NHttpMessageParser;
 import org.apache.hc.core5.http.nio.SessionInputBuffer;
+import org.apache.hc.core5.http.nio.NHttpMessageParser;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.CharArrayBuffer;
 
@@ -62,7 +62,7 @@ public abstract class AbstractMessagePar
     private int emptyLineCount;
 
     private final LineParser lineParser;
-    private final MessageConstraints messageConstraints;
+    private final H1Config messageConstraints;
 
     /**
      * Creates an instance of AbstractMessageParser.
@@ -70,14 +70,14 @@ public abstract class AbstractMessagePar
      * @param lineParser the line parser. If {@code null}
      *   {@link org.apache.hc.core5.http.message.LazyLineParser#INSTANCE} will be used.
      * @param messageConstraints Message constraints. If {@code null}
-     *   {@link MessageConstraints#DEFAULT} will be used.
+     *   {@link H1Config#DEFAULT} will be used.
      *
      * @since 4.3
      */
-    public AbstractMessageParser(final LineParser lineParser, final MessageConstraints messageConstraints) {
+    public AbstractMessageParser(final LineParser lineParser, final H1Config messageConstraints) {
         super();
         this.lineParser = lineParser != null ? lineParser : LazyLineParser.INSTANCE;
-        this.messageConstraints = messageConstraints != null ? messageConstraints : MessageConstraints.DEFAULT;
+        this.messageConstraints = messageConstraints != null ? messageConstraints : H1Config.DEFAULT;
         this.headerBufs = new ArrayList<>();
         this.state = READ_HEAD_LINE;
     }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageWriter.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageWriter.java?rev=1767339&r1=1767338&r2=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageWriter.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractMessageWriter.java Mon Oct 31 17:33:27 2016
@@ -36,8 +36,8 @@ import org.apache.hc.core5.http.HttpExce
 import org.apache.hc.core5.http.HttpMessage;
 import org.apache.hc.core5.http.message.BasicLineFormatter;
 import org.apache.hc.core5.http.message.LineFormatter;
-import org.apache.hc.core5.http.nio.NHttpMessageWriter;
 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
+import org.apache.hc.core5.http.nio.NHttpMessageWriter;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.CharArrayBuffer;
 

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkDecoder.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkDecoder.java?rev=1767339&r1=1767338&r2=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkDecoder.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkDecoder.java Mon Oct 31 17:33:27 2016
@@ -39,7 +39,7 @@ import org.apache.hc.core5.http.Malforme
 import org.apache.hc.core5.http.MessageConstraintException;
 import org.apache.hc.core5.http.ParseException;
 import org.apache.hc.core5.http.TruncatedChunkException;
-import org.apache.hc.core5.http.config.MessageConstraints;
+import org.apache.hc.core5.http.config.H1Config;
 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
 import org.apache.hc.core5.http.message.BufferedHeader;
 import org.apache.hc.core5.http.nio.SessionInputBuffer;
@@ -66,7 +66,7 @@ public class ChunkDecoder extends Abstra
     private long chunkSize;
     private long pos;
 
-    private final MessageConstraints constraints;
+    private final H1Config constraints;
     private final List<CharArrayBuffer> trailerBufs;
 
     private Header[] footers;
@@ -77,7 +77,7 @@ public class ChunkDecoder extends Abstra
     public ChunkDecoder(
             final ReadableByteChannel channel,
             final SessionInputBuffer buffer,
-            final MessageConstraints constraints,
+            final H1Config constraints,
             final BasicHttpTransportMetrics metrics) {
         super(channel, buffer, metrics);
         this.state = READ_CONTENT;
@@ -85,7 +85,7 @@ public class ChunkDecoder extends Abstra
         this.pos = 0L;
         this.endOfChunk = false;
         this.endOfStream = false;
-        this.constraints = constraints != null ? constraints : MessageConstraints.DEFAULT;
+        this.constraints = constraints != null ? constraints : H1Config.DEFAULT;
         this.trailerBufs = new ArrayList<>();
     }
 

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkEncoder.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkEncoder.java?rev=1767339&r1=1767338&r2=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkEncoder.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ChunkEncoder.java Mon Oct 31 17:33:27 2016
@@ -90,7 +90,7 @@ public class ChunkEncoder extends Abstra
         while (src.hasRemaining()) {
             int chunk = src.remaining();
             int avail;
-            avail = this.buffer.available();
+            avail = this.buffer.capacity();
 
             // subtract the length of the longest chunk header
             // 12345678\r\n

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandler.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/CapacityChannel.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandler.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandler.java&p1=httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/CapacityChannel.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/CapacityChannel.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandler.java Mon Oct 31 17:33:27 2016
@@ -24,23 +24,23 @@
  * <http://www.apache.org/>.
  *
  */
-package org.apache.hc.core5.http2.nio;
 
-import java.io.IOException;
+package org.apache.hc.core5.http.impl.nio;
 
 import org.apache.hc.core5.annotation.Contract;
 import org.apache.hc.core5.annotation.ThreadingBehavior;
 
 /**
- * Abstract capacity update channel.
- * <p>
- * Implementations are expected to be thread-safe.
+ * {@link org.apache.hc.core5.reactor.IOEventHandler} that implements client side HTTP/1.1 messaging protocol.
  *
  * @since 5.0
  */
-@Contract(threading = ThreadingBehavior.SAFE)
-public interface CapacityChannel {
+@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
+public class ClientHttp1IOEventHandler extends AbstractHttp1IOEventHandler {
 
-    void update(int increment) throws IOException;
+    public ClientHttp1IOEventHandler(final ClientHttp1StreamDuplexer streamDuplexer) {
+        super(streamDuplexer);
+    }
 
 }
+

Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,136 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.nio;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.ConnectionReuseStrategy;
+import org.apache.hc.core5.http.ContentLengthStrategy;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.config.ConnectionConfig;
+import org.apache.hc.core5.http.config.H1Config;
+import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
+import org.apache.hc.core5.http.nio.NHttpMessageParserFactory;
+import org.apache.hc.core5.http.nio.NHttpMessageWriterFactory;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.reactor.IOEventHandler;
+import org.apache.hc.core5.reactor.IOEventHandlerFactory;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
+public class ClientHttp1IOEventHandlerFactory implements IOEventHandlerFactory {
+
+    private final HttpProcessor httpProcessor;
+    private final ConnectionConfig connectionConfig;
+    private final ConnectionReuseStrategy connectionReuseStrategy;
+    private final NHttpMessageParserFactory<HttpResponse> responseParserFactory;
+    private final NHttpMessageWriterFactory<HttpRequest> requestWriterFactory;
+    private final ContentLengthStrategy incomingContentStrategy;
+    private final ContentLengthStrategy outgoingContentStrategy;
+    private final ConnectionListener connectionListener;
+    private final Http1StreamListener streamListener;
+
+    public ClientHttp1IOEventHandlerFactory(
+            final HttpProcessor httpProcessor,
+            final ConnectionConfig connectionConfig,
+            final ConnectionReuseStrategy connectionReuseStrategy,
+            final NHttpMessageParserFactory<HttpResponse> responseParserFactory,
+            final NHttpMessageWriterFactory<HttpRequest> requestWriterFactory,
+            final ContentLengthStrategy incomingContentStrategy,
+            final ContentLengthStrategy outgoingContentStrategy,
+            final ConnectionListener connectionListener,
+            final Http1StreamListener streamListener) {
+        this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
+        this.connectionConfig = connectionConfig !=  null ? connectionConfig : ConnectionConfig.DEFAULT;
+        this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
+                DefaultConnectionReuseStrategy.INSTANCE;
+        this.responseParserFactory = responseParserFactory != null ? responseParserFactory :
+                DefaultHttpResponseParserFactory.INSTANCE;
+        this.requestWriterFactory = requestWriterFactory != null ? requestWriterFactory :
+                DefaultHttpRequestWriterFactory.INSTANCE;
+        this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
+                DefaultContentLengthStrategy.INSTANCE;
+        this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
+                DefaultContentLengthStrategy.INSTANCE;
+        this.connectionListener = connectionListener;
+        this.streamListener = streamListener;
+    }
+
+    public ClientHttp1IOEventHandlerFactory(
+            final HttpProcessor httpProcessor,
+            final ConnectionConfig connectionConfig,
+            final ConnectionReuseStrategy connectionReuseStrategy,
+            final NHttpMessageParserFactory<HttpResponse> responseParserFactory,
+            final NHttpMessageWriterFactory<HttpRequest> requestWriterFactory,
+            final ConnectionListener connectionListener,
+            final Http1StreamListener streamListener) {
+        this(httpProcessor, connectionConfig, connectionReuseStrategy,
+                responseParserFactory, requestWriterFactory, null ,null, connectionListener, streamListener);
+    }
+
+    public ClientHttp1IOEventHandlerFactory(
+            final HttpProcessor httpProcessor,
+            final ConnectionConfig connectionConfig,
+            final ConnectionListener connectionListener,
+            final Http1StreamListener streamListener) {
+        this(httpProcessor, connectionConfig, null, null, null, connectionListener, streamListener);
+    }
+
+    public ClientHttp1IOEventHandlerFactory(
+            final HttpProcessor httpProcessor,
+            final ConnectionConfig connectionConfig) {
+        this(httpProcessor, connectionConfig, null, null);
+    }
+
+    @Override
+    public IOEventHandler createHandler(final IOSession ioSession) {
+        return new ClientHttp1IOEventHandler(createStreamDuplexer(ioSession));
+    }
+
+    protected ClientHttp1StreamDuplexer createStreamDuplexer(final IOSession ioSession) {
+        return new ClientHttp1StreamDuplexer(
+                ioSession,
+                httpProcessor,
+                H1Config.DEFAULT,
+                connectionConfig,
+                connectionReuseStrategy,
+                responseParserFactory.create(H1Config.DEFAULT),
+                requestWriterFactory.create(),
+                incomingContentStrategy,
+                outgoingContentStrategy,
+                connectionListener,
+                streamListener);
+    }
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1IOEventHandlerFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,371 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.impl.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hc.core5.http.ConnectionClosedException;
+import org.apache.hc.core5.http.ConnectionReuseStrategy;
+import org.apache.hc.core5.http.ContentLengthStrategy;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.LengthRequiredException;
+import org.apache.hc.core5.http.config.ConnectionConfig;
+import org.apache.hc.core5.http.config.H1Config;
+import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
+import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
+import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
+import org.apache.hc.core5.http.nio.ContentDecoder;
+import org.apache.hc.core5.http.nio.ContentEncoder;
+import org.apache.hc.core5.http.nio.NHttpMessageParser;
+import org.apache.hc.core5.http.nio.NHttpMessageWriter;
+import org.apache.hc.core5.http.nio.SessionInputBuffer;
+import org.apache.hc.core5.http.nio.SessionOutputBuffer;
+import org.apache.hc.core5.http.nio.command.ExecutionCommand;
+import org.apache.hc.core5.http.nio.command.ShutdownType;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Asserts;
+
+public class ClientHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpResponse, HttpRequest> {
+
+    private final HttpProcessor httpProcessor;
+    private final ConnectionReuseStrategy connectionReuseStrategy;
+    private final int fragmentSizeHint;
+    private final H1Config h1Config;
+    private final ContentLengthStrategy incomingContentStrategy;
+    private final ContentLengthStrategy outgoingContentStrategy;
+    private final Http1StreamListener streamListener;
+    private final ByteBuffer contentBuffer;
+    private final Queue<ClientHttp1StreamHandler> pipeline;
+    private final Http1StreamChannel<HttpRequest> outputChannel;
+
+    private volatile boolean inconsistent;
+    private volatile ClientHttp1StreamHandler outgoing;
+    private volatile ClientHttp1StreamHandler incoming;
+
+    public ClientHttp1StreamDuplexer(
+            final IOSession ioSession,
+            final HttpProcessor httpProcessor,
+            final H1Config h1Config,
+            final ConnectionConfig connectionConfig,
+            final ConnectionReuseStrategy connectionReuseStrategy,
+            final NHttpMessageParser<HttpResponse> incomingMessageParser,
+            final NHttpMessageWriter<HttpRequest> outgoingMessageWriter,
+            final ContentLengthStrategy incomingContentStrategy,
+            final ContentLengthStrategy outgoingContentStrategy,
+            final ConnectionListener connectionListener,
+            final Http1StreamListener streamListener) {
+        super(ioSession, connectionConfig, incomingMessageParser, outgoingMessageWriter, connectionListener);
+        this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
+        final int bufferSize = connectionConfig.getBufferSize();
+        final int fragmentSizeHint = connectionConfig.getFragmentSizeHint();
+        this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : bufferSize;
+        this.h1Config = h1Config != null ? h1Config : H1Config.DEFAULT;
+        this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
+                DefaultConnectionReuseStrategy.INSTANCE;
+        this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
+                DefaultContentLengthStrategy.INSTANCE;
+        this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
+                DefaultContentLengthStrategy.INSTANCE;
+        this.streamListener = streamListener;
+        this.contentBuffer = ByteBuffer.allocate(connectionConfig.getBufferSize());
+        this.pipeline = new ConcurrentLinkedQueue<>();
+        this.outputChannel = new Http1StreamChannel<HttpRequest>() {
+
+            @Override
+            public void submit(final HttpRequest request, final boolean endStream) throws HttpException, IOException {
+                if (streamListener != null) {
+                    streamListener.onRequestHead(ClientHttp1StreamDuplexer.this, request);
+                }
+                commitMessageHead(request, endStream);
+            }
+
+            @Override
+            public void update(final int increment) throws IOException {
+                if (increment > 0) {
+                    requestSessionInput();
+                }
+            }
+
+            @Override
+            public void suspendInput() {
+                suspendSessionInput();
+            }
+
+            @Override
+            public void requestInput() {
+                requestSessionInput();
+            }
+
+            @Override
+            public void suspendOutput() {
+                suspendSessionOutput();
+            }
+
+            @Override
+            public void requestOutput() {
+                requestSessionOutput();
+            }
+
+            @Override
+            public int write(final ByteBuffer src) throws IOException {
+                return streamOutput(src);
+            }
+
+            @Override
+            public void complete() throws IOException {
+                endOutputStream();
+            }
+
+            @Override
+            public boolean isCompleted() {
+                return isOutputCompleted();
+            }
+
+            @Override
+            public void abortOutput() throws IOException {
+                final MessageDelineation messageDelineation = endOutputStream();
+                if (messageDelineation == MessageDelineation.MESSAGE_HEAD) {
+                    inconsistent = true;
+                    requestShutdown(ShutdownType.GRACEFUL);
+                }
+            }
+
+            @Override
+            public void activate() throws HttpException, IOException {
+            }
+
+        };
+    }
+
+    @Override
+    public void releaseResources() {
+        if (incoming != null) {
+            incoming.releaseResources();
+            incoming = null;
+        }
+        if (outgoing != null) {
+            outgoing.releaseResources();
+            outgoing = null;
+        }
+        for (;;) {
+            final ClientHttp1StreamHandler handler = pipeline.poll();
+            if (handler != null) {
+                handler.releaseResources();
+            } else {
+                break;
+            }
+        }
+    }
+
+    @Override
+    void terminate(final Exception exception) {
+        if (incoming != null) {
+            incoming.failed(exception);
+            incoming = null;
+        }
+        if (outgoing != null) {
+            outgoing.failed(exception);
+            outgoing = null;
+        }
+        for (;;) {
+            final ClientHttp1StreamHandler handler = pipeline.poll();
+            if (handler != null) {
+                handler.failed(exception);
+            } else {
+                break;
+            }
+        }
+    }
+
+    @Override
+    void updateInputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
+        if (response.getCode() >= 200) {
+            connMetrics.incrementRequestCount();
+        }
+    }
+
+    @Override
+    void updateOutputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
+        connMetrics.incrementRequestCount();
+    }
+
+    @Override
+    protected ContentDecoder handleIncomingMessage(
+            final HttpResponse response,
+            final ReadableByteChannel channel,
+            final SessionInputBuffer buffer,
+            final BasicHttpTransportMetrics metrics) throws HttpException {
+
+        if (incoming == null) {
+            incoming = pipeline.poll();
+        }
+        if (incoming == null) {
+            throw new HttpException("Unexpected response");
+        }
+
+        if (incoming.isHeadRequest()) {
+            return null;
+        }
+        final int status = response.getCode();
+        if (status < HttpStatus.SC_SUCCESS || status == HttpStatus.SC_NO_CONTENT || status == HttpStatus.SC_NOT_MODIFIED) {
+            return null;
+        }
+        final long len = incomingContentStrategy.determineLength(response);
+        if (len >= 0) {
+            return new LengthDelimitedDecoder(channel, buffer, metrics, len);
+        } else if (len == ContentLengthStrategy.CHUNKED) {
+            return new ChunkDecoder(channel, buffer, h1Config, metrics);
+        } else {
+            return new IdentityDecoder(channel, buffer, metrics);
+        }
+    }
+
+    @Override
+    protected ContentEncoder handleOutgoingMessage(
+            final HttpRequest request,
+            final WritableByteChannel channel,
+            final SessionOutputBuffer buffer,
+            final BasicHttpTransportMetrics metrics) throws HttpException {
+        final long len = outgoingContentStrategy.determineLength(request);
+        if (len >= 0) {
+            return new LengthDelimitedEncoder(channel, buffer, metrics, len, fragmentSizeHint);
+        } else if (len == ContentLengthStrategy.CHUNKED) {
+            return new ChunkEncoder(channel, buffer, metrics, fragmentSizeHint, null);
+        } else {
+            throw new LengthRequiredException("Length required");
+        }
+    }
+
+    @Override
+    boolean inputIdle() {
+        return incoming == null;
+    }
+
+    @Override
+    boolean outputIdle() {
+        return outgoing == null && pipeline.isEmpty();
+    }
+
+    @Override
+    void outputEnd() throws HttpException, IOException {
+        if (outgoing != null) {
+            if (outgoing.isCompleted()) {
+                outgoing.releaseResources();
+            }
+            outgoing = null;
+        }
+    }
+
+    @Override
+    void execute(final ExecutionCommand executionCommand) throws HttpException, IOException {
+        final ClientHttp1StreamHandler handler = new ClientHttp1StreamHandler(
+                this,
+                outputChannel,
+                httpProcessor,
+                h1Config,
+                connectionReuseStrategy,
+                executionCommand.getExchangeHandler(),
+                executionCommand.getContext(),
+                contentBuffer);
+        if (handler.isOutputReady()) {
+            handler.produceOutput();
+        }
+        pipeline.add(handler);
+        outgoing = handler;
+    }
+
+    @Override
+    boolean isOutputReady() {
+        return outgoing != null && outgoing.isOutputReady();
+    }
+
+    @Override
+    void produceOutput() throws HttpException, IOException {
+        if (outgoing != null) {
+            outgoing.produceOutput();
+        }
+    }
+
+    @Override
+    void consumeHeader(final HttpResponse response, final boolean endStream) throws HttpException, IOException {
+        if (streamListener != null) {
+            streamListener.onResponseHead(this, response);
+        }
+        Asserts.notNull(incoming, "Response stream handler");
+        incoming.consumeHeader(response, endStream);
+    }
+
+    @Override
+    void consumeData(final ContentDecoder contentDecoder) throws HttpException, IOException {
+        Asserts.notNull(incoming, "Response stream handler");
+        incoming.consumeData(contentDecoder);
+    }
+
+    @Override
+    void inputEnd() throws HttpException, IOException {
+        Asserts.notNull(incoming, "Response stream handler");
+        if (incoming.isResponseCompleted()) {
+            final boolean keepAlive = !inconsistent && incoming.keepAlive();
+            if (incoming.isCompleted()) {
+                incoming.releaseResources();
+            }
+            incoming = null;
+            if (streamListener != null) {
+                streamListener.onExchangeComplete(this, keepAlive);
+            }
+            if (!keepAlive) {
+                if (outgoing != null && outgoing.isCompleted()) {
+                    outgoing.releaseResources();
+                    outgoing = null;
+                }
+                if (outgoing == null && pipeline.isEmpty()) {
+                    requestShutdown(ShutdownType.IMMEDIATE);
+                } else {
+                    doTerminate(new ConnectionClosedException("Connection cannot be kept alive"));
+                }
+            }
+        }
+    }
+
+    @Override
+    boolean handleTimeout() {
+        return outgoing != null && outgoing.handleTimeout();
+    }
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,313 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.impl.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hc.core5.http.ConnectionReuseStrategy;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HeaderElements;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.ProtocolException;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.config.H1Config;
+import org.apache.hc.core5.http.impl.LazyEntityDetails;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.ContentDecoder;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.nio.ResourceHolder;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
+
+class ClientHttp1StreamHandler implements ResourceHolder {
+
+    private final HttpConnection connection;
+    private final Http1StreamChannel<HttpRequest> outputChannel;
+    private final DataStreamChannel internalDataChannel;
+    private final HttpProcessor httpProcessor;
+    private final H1Config h1Config;
+    private final ConnectionReuseStrategy connectionReuseStrategy;
+    private final AsyncClientExchangeHandler exchangeHandler;
+    private final HttpCoreContext context;
+    private final ByteBuffer inputBuffer;
+    private final AtomicBoolean requestCommitted;
+    private final AtomicBoolean done;
+
+    private volatile int timeout;
+    private volatile HttpRequest committedRequest;
+    private volatile HttpResponse receivedResponse;
+    private volatile MessageState requestState;
+    private volatile MessageState responseState;
+
+    ClientHttp1StreamHandler(
+            final HttpConnection connection,
+            final Http1StreamChannel<HttpRequest> outputChannel,
+            final HttpProcessor httpProcessor,
+            final H1Config h1Config,
+            final ConnectionReuseStrategy connectionReuseStrategy,
+            final AsyncClientExchangeHandler exchangeHandler,
+            final HttpContext context,
+            final ByteBuffer inputBuffer) {
+        this.connection = connection;
+        this.outputChannel = outputChannel;
+        this.internalDataChannel = new DataStreamChannel() {
+
+            @Override
+            public void requestOutput() {
+                outputChannel.requestOutput();
+            }
+
+            @Override
+            public void endStream(final List<Header> trailers) throws IOException {
+                outputChannel.complete();
+                requestState = MessageState.COMPLETE;
+            }
+
+            @Override
+            public int write(final ByteBuffer src) throws IOException {
+                return outputChannel.write(src);
+            }
+
+            @Override
+            public void endStream() throws IOException {
+                endStream(null);
+            }
+
+        };
+
+        this.httpProcessor = httpProcessor;
+        this.h1Config = h1Config;
+        this.connectionReuseStrategy = connectionReuseStrategy;
+        this.exchangeHandler = exchangeHandler;
+        this.context = context != null ? HttpCoreContext.adapt(context) : HttpCoreContext.create();
+        this.inputBuffer = inputBuffer;
+        this.requestCommitted = new AtomicBoolean(false);
+        this.done = new AtomicBoolean(false);
+        this.requestState = MessageState.HEADERS;
+        this.responseState = MessageState.HEADERS;
+    }
+
+    boolean isResponseCompleted() {
+        return responseState == MessageState.COMPLETE;
+    }
+
+    boolean isCompleted() {
+        return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
+    }
+
+    boolean keepAlive() {
+        return committedRequest != null && receivedResponse != null &&
+                connectionReuseStrategy.keepAlive(committedRequest, receivedResponse, context);
+    }
+
+    boolean isHeadRequest() {
+        return committedRequest != null && "HEAD".equalsIgnoreCase(committedRequest.getMethod());
+    }
+
+    boolean isOutputReady() {
+        switch (requestState) {
+            case HEADERS:
+            case ACK:
+                return true;
+            case BODY:
+                return exchangeHandler.available() > 0;
+            default:
+                return false;
+        }
+    }
+
+    private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws IOException, HttpException {
+        if (requestCommitted.compareAndSet(false, true)) {
+            final ProtocolVersion transportVersion = request.getVersion();
+            if (transportVersion != null) {
+                context.setProtocolVersion(transportVersion);
+            }
+            context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
+            context.setAttribute(HttpCoreContext.HTTP_CONNECTION, connection);
+            httpProcessor.process(request, entityDetails, context);
+
+            final boolean endStream = entityDetails == null;
+            outputChannel.submit(request, endStream);
+            committedRequest = request;
+            if (endStream) {
+                requestState = MessageState.COMPLETE;
+            } else {
+                final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
+                final boolean expectContinue = h != null && "100-continue".equalsIgnoreCase(h.getValue());
+                if (expectContinue) {
+                    requestState = MessageState.ACK;
+                    timeout = connection.getSocketTimeout();
+                    connection.setSocketTimeout(h1Config.getWaitForContinueTimeout());
+                } else {
+                    requestState = MessageState.BODY;
+                }
+            }
+        } else {
+            throw new HttpException("Request already committed");
+        }
+    }
+
+    void produceOutput() throws HttpException, IOException {
+        switch (requestState) {
+            case HEADERS:
+                exchangeHandler.produceRequest(new RequestChannel() {
+
+                    @Override
+                    public void sendRequest(
+                            final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
+                        commitRequest(request, entityDetails);
+                    }
+
+                });
+                break;
+            case ACK:
+                outputChannel.suspendOutput();
+                break;
+            case BODY:
+                exchangeHandler.produce(internalDataChannel);
+                break;
+        }
+    }
+
+    private void validateStatus(final HttpResponse response) throws ProtocolException {
+        if (response.getCode() < HttpStatus.SC_INFORMATIONAL) {
+            throw new ProtocolException("Invalid response code");
+        }
+        if (response.getCode() < HttpStatus.SC_SUCCESS) {
+            if (response.getCode() != HttpStatus.SC_CONTINUE) {
+                throw new ProtocolException("Unsupported intermediate response code");
+            }
+        }
+    }
+
+    void consumeHeader(final HttpResponse response, final boolean endStream) throws HttpException, IOException {
+        if (done.get() || responseState != MessageState.HEADERS) {
+            throw new ProtocolException("Unexpected message head");
+        }
+
+        validateStatus(response);
+
+        if (requestState == MessageState.ACK) {
+            connection.setSocketTimeout(timeout);
+            requestState = MessageState.BODY;
+            if (response.getCode() < HttpStatus.SC_CLIENT_ERROR) {
+                exchangeHandler.produce(internalDataChannel);
+            }
+        }
+        if (response.getCode() == HttpStatus.SC_CONTINUE) {
+            return;
+        }
+        if (requestState == MessageState.BODY) {
+            boolean keepAlive = response.getCode() < HttpStatus.SC_CLIENT_ERROR;
+            if (keepAlive) {
+                final Header h = response.getFirstHeader(HttpHeaders.CONNECTION);
+                if (h != null && HeaderElements.CLOSE.equalsIgnoreCase(h.getValue())) {
+                    keepAlive = false;
+                }
+            }
+            if (!keepAlive) {
+                requestState = MessageState.COMPLETE;
+                outputChannel.abortOutput();
+            }
+        }
+
+        final EntityDetails entityDetails = endStream ? null : new LazyEntityDetails(response);
+        context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
+        httpProcessor.process(response, entityDetails, context);
+        receivedResponse = response;
+
+        exchangeHandler.consumeResponse(response, entityDetails);
+        responseState = endStream ? MessageState.COMPLETE : MessageState.BODY;
+    }
+
+    void consumeData(final ContentDecoder contentDecoder) throws HttpException, IOException {
+        if (done.get() || responseState != MessageState.BODY) {
+            throw new ProtocolException("Unexpected message data");
+        }
+        while (contentDecoder.read(inputBuffer) > 0) {
+            inputBuffer.flip();
+            final int capacity = exchangeHandler.consume(inputBuffer);
+            inputBuffer.clear();
+            if (capacity <= 0) {
+                if (!contentDecoder.isCompleted()) {
+                    outputChannel.suspendInput();
+                    exchangeHandler.updateCapacity(outputChannel);
+                }
+                break;
+            }
+        }
+        if (contentDecoder.isCompleted()) {
+            responseState = MessageState.COMPLETE;
+            exchangeHandler.streamEnd(null);
+        }
+    }
+
+    boolean handleTimeout() {
+        if (requestState == MessageState.ACK) {
+            requestState = MessageState.BODY;
+            connection.setSocketTimeout(timeout);
+            outputChannel.requestOutput();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    void failed(final Exception cause) {
+        exchangeHandler.failed(cause);
+    }
+
+    @Override
+    public void releaseResources() {
+        if (done.compareAndSet(false, true)) {
+            responseState = MessageState.COMPLETE;
+            requestState = MessageState.COMPLETE;
+            exchangeHandler.releaseResources();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "[" +
+                "requestState=" + requestState +
+                ", responseState=" + responseState +
+                ']';
+    }
+
+}
+

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Pipelined.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Pipelined.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Pipelined.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java Mon Oct 31 17:33:27 2016
@@ -26,14 +26,19 @@
  */
 package org.apache.hc.core5.http.impl.nio;
 
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Documented
-@Target({ElementType.TYPE})
-@Retention(RetentionPolicy.RUNTIME)
-public @interface Pipelined {
+import org.apache.hc.core5.http.HttpConnection;
+
+/**
+ * Connection event listener.
+ *
+ * @since 5.0
+ */
+public interface ConnectionListener {
+
+    void onConnect(HttpConnection connection);
+
+    void onError(HttpConnection connection, Exception ex);
+
+    void onDisconnect(HttpConnection connection);
+
 }

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ConnectionListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/DefaultHttpRequestFactory.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/DefaultHttpRequestFactory.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/DefaultHttpRequestFactory.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java Mon Oct 31 17:33:27 2016
@@ -25,23 +25,23 @@
  *
  */
 
-package org.apache.hc.core5.http.impl;
+package org.apache.hc.core5.http.impl.nio;
 
 import org.apache.hc.core5.annotation.Contract;
 import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.ClassicHttpRequest;
+import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpRequestFactory;
 import org.apache.hc.core5.http.MethodNotSupportedException;
 import org.apache.hc.core5.http.ProtocolVersion;
-import org.apache.hc.core5.http.message.BasicClassicHttpRequest;
+import org.apache.hc.core5.http.message.BasicHttpRequest;
 
 /**
- * Default factory for creating {@link ClassicHttpRequest} objects.
+ * Default factory for creating {@link HttpRequest} objects.
  *
  * @since 4.0
  */
 @Contract(threading = ThreadingBehavior.IMMUTABLE)
-public class DefaultHttpRequestFactory implements HttpRequestFactory<ClassicHttpRequest> {
+public class DefaultHttpRequestFactory implements HttpRequestFactory<HttpRequest> {
 
     public static final DefaultHttpRequestFactory INSTANCE = new DefaultHttpRequestFactory();
 
@@ -67,9 +67,9 @@ public class DefaultHttpRequestFactory i
     }
 
     @Override
-    public ClassicHttpRequest newHttpRequest(final ProtocolVersion transportVersion, final String method, final String uri) throws MethodNotSupportedException {
+    public HttpRequest newHttpRequest(final ProtocolVersion transportVersion, final String method, final String uri) throws MethodNotSupportedException {
         if (isOneOf(SUPPORTED_METHODS, method)) {
-            final ClassicHttpRequest request = new BasicClassicHttpRequest(method, uri);
+            final HttpRequest request = new BasicHttpRequest(method, uri);
             request.setVersion(transportVersion);
             return request;
         }
@@ -77,8 +77,8 @@ public class DefaultHttpRequestFactory i
     }
 
     @Override
-    public ClassicHttpRequest newHttpRequest(final String method, final String uri) throws MethodNotSupportedException {
-        return new BasicClassicHttpRequest(method, uri);
+    public HttpRequest newHttpRequest(final String method, final String uri) throws MethodNotSupportedException {
+        return new BasicHttpRequest(method, uri);
     }
 
 }

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpRequestFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain