You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/08/01 19:07:00 UTC
[2/6] httpcomponents-core git commit: Protocol handling API
refactoring (no functional changes, mostly moving code to different packages)
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityConsumer.java
deleted file mode 100644
index 6640697..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityConsumer.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.impl.nio.entity;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hc.core5.concurrent.FutureCallback;
-import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.Header;
-import org.apache.hc.core5.http.HttpException;
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
-import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.http.nio.entity.ContentInputStream;
-import org.apache.hc.core5.util.Args;
-
-/**
- * @since 5.0
- */
-public abstract class AbstractClassicEntityConsumer<T> implements AsyncEntityConsumer<T> {
-
- private enum State { IDLE, ACTIVE, COMPLETED }
-
- private final Executor executor;
- private final SharedInputBuffer buffer;
- private final AtomicReference<State> state;
- private final AtomicReference<T> resultRef;
- private final AtomicReference<Exception> exceptionRef;
-
- public AbstractClassicEntityConsumer(final int initialBufferSize, final Executor executor) {
- this.executor = Args.notNull(executor, "Executor");
- this.buffer = new SharedInputBuffer(initialBufferSize);
- this.state = new AtomicReference<>(State.IDLE);
- this.resultRef = new AtomicReference<>(null);
- this.exceptionRef = new AtomicReference<>(null);
- }
-
- protected abstract T consumeData(ContentType contentType, InputStream inputStream) throws IOException;
-
- @Override
- public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
- buffer.updateCapacity(capacityChannel);
- }
-
- @Override
- public final void streamStart(final EntityDetails entityDetails, final FutureCallback<T> resultCallback) throws HttpException, IOException {
- final ContentType contentType;
- try {
- contentType = ContentType.parse(entityDetails.getContentType());
- } catch (final UnsupportedCharsetException ex) {
- throw new UnsupportedEncodingException(ex.getMessage());
- }
- if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
- executor.execute(new Runnable() {
-
- @Override
- public void run() {
- try {
- final T result = consumeData(contentType, new ContentInputStream(buffer));
- resultRef.set(result);
- resultCallback.completed(result);
- } catch (final Exception ex) {
- buffer.abort();
- resultCallback.failed(ex);
- } finally {
- state.set(State.COMPLETED);
- }
- }
-
- });
- }
- }
-
- @Override
- public final int consume(final ByteBuffer src) throws IOException {
- return buffer.fill(src);
- }
-
- @Override
- public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
- buffer.markEndStream();
- }
-
- @Override
- public final void failed(final Exception cause) {
- if (exceptionRef.compareAndSet(null, cause)) {
- releaseResources();
- }
- }
-
- public final Exception getException() {
- return exceptionRef.get();
- }
-
- @Override
- public final T getContent() {
- return resultRef.get();
- }
-
- @Override
- public void releaseResources() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityProducer.java
deleted file mode 100644
index 40768b1..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractClassicEntityProducer.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.impl.nio.entity;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.nio.AsyncEntityProducer;
-import org.apache.hc.core5.http.nio.DataStreamChannel;
-import org.apache.hc.core5.http.nio.entity.ContentOutputStream;
-import org.apache.hc.core5.util.Args;
-
-/**
- * @since 5.0
- */
-public abstract class AbstractClassicEntityProducer implements AsyncEntityProducer {
-
- private enum State { IDLE, ACTIVE, COMPLETED }
-
- private final SharedOutputBuffer buffer;
- private final ContentType contentType;
- private final Executor executor;
- private final AtomicReference<State> state;
- private final AtomicReference<Exception> exception;
-
- public AbstractClassicEntityProducer(final int initialBufferSize, final ContentType contentType, final Executor executor) {
- this.buffer = new SharedOutputBuffer(initialBufferSize);
- this.contentType = contentType;
- this.executor = Args.notNull(executor, "Executor");
- this.state = new AtomicReference<>(State.IDLE);
- this.exception = new AtomicReference<>(null);
- }
-
- protected abstract void produceData(ContentType contentType, OutputStream outputStream) throws IOException;
-
- @Override
- public final int available() {
- return buffer.length();
- }
-
- @Override
- public final void produce(final DataStreamChannel channel) throws IOException {
- if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
- executor.execute(new Runnable() {
-
- @Override
- public void run() {
- try {
- produceData(contentType, new ContentOutputStream(buffer));
- buffer.writeCompleted();
- } catch (final Exception ex) {
- buffer.abort();
- } finally {
- state.set(State.COMPLETED);
- }
- }
-
- });
- }
- buffer.flush(channel);
- }
-
- @Override
- public final long getContentLength() {
- return -1;
- }
-
- @Override
- public final String getContentType() {
- return contentType != null ? contentType.toString() : null;
- }
-
- @Override
- public String getContentEncoding() {
- return null;
- }
-
- @Override
- public final boolean isChunked() {
- return false;
- }
-
- @Override
- public final Set<String> getTrailerNames() {
- return null;
- }
-
- @Override
- public final void failed(final Exception cause) {
- if (exception.compareAndSet(null, cause)) {
- releaseResources();
- }
- }
-
- public final Exception getException() {
- return exception.get();
- }
-
- @Override
- public void releaseResources() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractSharedBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractSharedBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractSharedBuffer.java
deleted file mode 100644
index 70fb961..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/AbstractSharedBuffer.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.impl.nio.entity;
-
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.impl.nio.ExpandableBuffer;
-import org.apache.hc.core5.util.Args;
-
-/**
- * @since 5.0
- */
-@Contract(threading = ThreadingBehavior.SAFE)
-abstract class AbstractSharedBuffer extends ExpandableBuffer {
-
- final ReentrantLock lock;
- final Condition condition;
-
- volatile boolean endStream;
- volatile boolean aborted;
-
- public AbstractSharedBuffer(final ReentrantLock lock, final int initialBufferSize) {
- super(initialBufferSize);
- this.lock = Args.notNull(lock, "Lock");
- this.condition = lock.newCondition();
- }
-
- @Override
- public boolean hasData() {
- lock.lock();
- try {
- return super.hasData();
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public int capacity() {
- lock.lock();
- try {
- return super.capacity();
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public int length() {
- lock.lock();
- try {
- return super.length();
- } finally {
- lock.unlock();
- }
- }
-
- public void abort() {
- lock.lock();
- try {
- endStream = true;
- aborted = true;
- condition.signalAll();
- } finally {
- lock.unlock();
- }
- }
-
- public void reset() {
- if (aborted) {
- return;
- }
- lock.lock();
- try {
- setInputMode();
- buffer().clear();
- endStream = false;
- } finally {
- lock.unlock();
- }
- }
-
- public boolean isEndStream() {
- lock.lock();
- try {
- return endStream && !super.hasData();
- } finally {
- lock.unlock();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedInputBuffer.java
deleted file mode 100644
index 302982a..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedInputBuffer.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.impl.nio.entity;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.http.nio.entity.ContentInputBuffer;
-
-/**
- * @since 5.0
- */
-@Contract(threading = ThreadingBehavior.SAFE)
-public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
-
- private volatile CapacityChannel capacityChannel;
-
- public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
- super(lock, initialBufferSize);
- }
-
- public SharedInputBuffer(final int buffersize) {
- super(new ReentrantLock(), buffersize);
- }
-
- public int fill(final ByteBuffer src) throws IOException {
- lock.lock();
- try {
- setInputMode();
- ensureCapacity(buffer().position() + src.remaining());
- buffer().put(src);
- final int remaining = buffer().remaining();
- condition.signalAll();
- return remaining;
- } finally {
- lock.unlock();
- }
- }
-
- public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
- lock.lock();
- try {
- this.capacityChannel = capacityChannel;
- setInputMode();
- if (buffer().hasRemaining()) {
- capacityChannel.update(buffer().remaining());
- }
- } finally {
- lock.unlock();
- }
- }
-
- private void awaitInput() throws InterruptedIOException {
- if (!buffer().hasRemaining()) {
- setInputMode();
- while (buffer().position() == 0 && !endStream && !aborted) {
- try {
- condition.await();
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException(ex.getMessage());
- }
- }
- setOutputMode();
- }
- }
-
- @Override
- public int read() throws IOException {
- lock.lock();
- try {
- setOutputMode();
- awaitInput();
- if (aborted) {
- return -1;
- }
- if (!buffer().hasRemaining() && endStream) {
- return -1;
- }
- final int b = buffer().get() & 0xff;
- if (!buffer().hasRemaining() && capacityChannel != null) {
- setInputMode();
- if (buffer().hasRemaining()) {
- capacityChannel.update(buffer().remaining());
- }
- }
- return b;
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException {
- lock.lock();
- try {
- setOutputMode();
- awaitInput();
- if (aborted) {
- return -1;
- }
- if (!buffer().hasRemaining() && endStream) {
- return -1;
- }
- final int chunk = Math.min(buffer().remaining(), len);
- buffer().get(b, off, chunk);
- if (!buffer().hasRemaining() && capacityChannel != null) {
- setInputMode();
- if (buffer().hasRemaining()) {
- capacityChannel.update(buffer().remaining());
- }
- }
- return chunk;
- } finally {
- lock.unlock();
- }
- }
-
- public void markEndStream() throws IOException {
- if (endStream) {
- return;
- }
- lock.lock();
- try {
- if (!endStream) {
- endStream = true;
- capacityChannel = null;
- condition.signalAll();
- }
- } finally {
- lock.unlock();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedOutputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedOutputBuffer.java
deleted file mode 100644
index 8d63def..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/entity/SharedOutputBuffer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.impl.nio.entity;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.nio.entity.ContentOutputBuffer;
-import org.apache.hc.core5.http.nio.DataStreamChannel;
-
-/**
- * @since 5.0
- */
-@Contract(threading = ThreadingBehavior.SAFE)
-public final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer {
-
- private volatile DataStreamChannel dataStreamChannel;
- private volatile boolean hasCapacity;
-
- public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) {
- super(lock, initialBufferSize);
- this.hasCapacity = false;
- }
-
- public SharedOutputBuffer(final int buffersize) {
- this(new ReentrantLock(), buffersize);
- }
-
- public void flush(final DataStreamChannel channel) throws IOException {
- lock.lock();
- try {
- dataStreamChannel = channel;
- hasCapacity = true;
- setOutputMode();
- if (buffer().hasRemaining()) {
- dataStreamChannel.write(buffer());
- }
- if (!buffer().hasRemaining() && endStream) {
- dataStreamChannel.endStream();
- }
- condition.signalAll();
- } finally {
- lock.unlock();
- }
- }
-
- private void ensureNotAborted() throws InterruptedIOException {
- if (aborted) {
- throw new InterruptedIOException("Operation aborted");
- }
- }
-
- @Override
- public void write(final byte[] b, final int off, final int len) throws IOException {
- final ByteBuffer src = ByteBuffer.wrap(b, off, len);
- lock.lock();
- try {
- ensureNotAborted();
- setInputMode();
- while (src.hasRemaining()) {
- // always buffer small chunks
- if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) {
- buffer().put(src);
- } else {
- if (buffer().position() > 0 || dataStreamChannel == null) {
- waitFlush();
- }
- if (buffer().position() == 0 && dataStreamChannel != null) {
- final int bytesWritten = dataStreamChannel.write(src);
- if (bytesWritten == 0) {
- hasCapacity = false;
- waitFlush();
- }
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void write(final int b) throws IOException {
- lock.lock();
- try {
- ensureNotAborted();
- setInputMode();
- if (!buffer().hasRemaining()) {
- waitFlush();
- }
- buffer().put((byte)b);
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void writeCompleted() throws IOException {
- if (endStream) {
- return;
- }
- lock.lock();
- try {
- if (!endStream) {
- endStream = true;
- if (dataStreamChannel != null) {
- setOutputMode();
- if (buffer().hasRemaining()) {
- dataStreamChannel.requestOutput();
- } else {
- dataStreamChannel.endStream();
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- private void waitFlush() throws InterruptedIOException {
- setOutputMode();
- if (dataStreamChannel != null) {
- dataStreamChannel.requestOutput();
- }
- ensureNotAborted();
- while (buffer().hasRemaining() || !hasCapacity) {
- try {
- condition.await();
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException(ex.getMessage());
- }
- ensureNotAborted();
- }
- setInputMode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/io/UriHttpRequestHandlerMapper.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/io/UriHttpRequestHandlerMapper.java b/httpcore5/src/main/java/org/apache/hc/core5/http/io/UriHttpRequestHandlerMapper.java
deleted file mode 100644
index 2a5c292..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/io/UriHttpRequestHandlerMapper.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.http.io;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.HttpRequest;
-import org.apache.hc.core5.http.protocol.HttpContext;
-import org.apache.hc.core5.http.protocol.LookupRegistry;
-import org.apache.hc.core5.http.protocol.UriPatternMatcher;
-import org.apache.hc.core5.util.Args;
-
-/**
- * Maintains a map of HTTP request handlers keyed by a request URI pattern.
- * <br>
- * Patterns may have three formats:
- * <ul>
- * <li>{@code *}</li>
- * <li>{@code *<uri>}</li>
- * <li>{@code <uri>*}</li>
- * </ul>
- * <br>
- * This class can be used to map an instance of
- * {@link HttpRequestHandler} matching a particular request URI. Usually the
- * mapped request handler will be used to process the request with the
- * specified request URI.
- *
- * @since 4.3
- */
-@Contract(threading = ThreadingBehavior.SAFE)
-public class UriHttpRequestHandlerMapper implements HttpRequestHandlerMapper {
-
- private final LookupRegistry<HttpRequestHandler> matcher;
-
- public UriHttpRequestHandlerMapper(final LookupRegistry<HttpRequestHandler> matcher) {
- super();
- this.matcher = Args.notNull(matcher, "Pattern matcher");
- }
-
- public UriHttpRequestHandlerMapper() {
- this(new UriPatternMatcher<HttpRequestHandler>());
- }
-
- /**
- * Registers the given {@link HttpRequestHandler} as a handler for URIs
- * matching the given pattern.
- *
- * @param pattern the pattern to register the handler for.
- * @param handler the handler.
- */
- public void register(final String pattern, final HttpRequestHandler handler) {
- Args.notNull(pattern, "Pattern");
- Args.notNull(handler, "Handler");
- matcher.register(pattern, handler);
- }
-
- /**
- * Removes registered handler, if exists, for the given pattern.
- *
- * @param pattern the pattern to unregister the handler for.
- */
- public void unregister(final String pattern) {
- matcher.unregister(pattern);
- }
-
- /**
- * Extracts request path from the given {@link HttpRequest}
- */
- protected String getRequestPath(final HttpRequest request) {
- String uriPath = request.getPath();
- int index = uriPath.indexOf("?");
- if (index != -1) {
- uriPath = uriPath.substring(0, index);
- } else {
- index = uriPath.indexOf("#");
- if (index != -1) {
- uriPath = uriPath.substring(0, index);
- }
- }
- return uriPath;
- }
-
- /**
- * Looks up a handler matching the given request URI.
- *
- * @param request the request
- * @return handler or {@code null} if no match is found.
- */
- @Override
- public HttpRequestHandler lookup(final HttpRequest request, final HttpContext context) {
- Args.notNull(request, "HTTP request");
- return matcher.lookup(getRequestPath(request));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/io/support/UriHttpRequestHandlerMapper.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/io/support/UriHttpRequestHandlerMapper.java b/httpcore5/src/main/java/org/apache/hc/core5/http/io/support/UriHttpRequestHandlerMapper.java
new file mode 100644
index 0000000..ec3d62c
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/io/support/UriHttpRequestHandlerMapper.java
@@ -0,0 +1,122 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.io.support;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.io.HttpRequestHandler;
+import org.apache.hc.core5.http.io.HttpRequestHandlerMapper;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.LookupRegistry;
+import org.apache.hc.core5.http.protocol.UriPatternMatcher;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * Maintains a map of HTTP request handlers keyed by a request URI pattern.
+ * <br>
+ * Patterns may have three formats:
+ * <ul>
+ * <li>{@code *}</li>
+ * <li>{@code *<uri>}</li>
+ * <li>{@code <uri>*}</li>
+ * </ul>
+ * <br>
+ * This class can be used to map an instance of
+ * {@link HttpRequestHandler} matching a particular request URI. Usually the
+ * mapped request handler will be used to process the request with the
+ * specified request URI.
+ *
+ * @since 4.3
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public class UriHttpRequestHandlerMapper implements HttpRequestHandlerMapper {
+
+ private final LookupRegistry<HttpRequestHandler> matcher;
+
+ public UriHttpRequestHandlerMapper(final LookupRegistry<HttpRequestHandler> matcher) {
+ super();
+ this.matcher = Args.notNull(matcher, "Pattern matcher");
+ }
+
+ public UriHttpRequestHandlerMapper() {
+ this(new UriPatternMatcher<HttpRequestHandler>());
+ }
+
+ /**
+ * Registers the given {@link HttpRequestHandler} as a handler for URIs
+ * matching the given pattern.
+ *
+ * @param pattern the pattern to register the handler for.
+ * @param handler the handler.
+ */
+ public void register(final String pattern, final HttpRequestHandler handler) {
+ Args.notNull(pattern, "Pattern");
+ Args.notNull(handler, "Handler");
+ matcher.register(pattern, handler);
+ }
+
+ /**
+ * Removes registered handler, if exists, for the given pattern.
+ *
+ * @param pattern the pattern to unregister the handler for.
+ */
+ public void unregister(final String pattern) {
+ matcher.unregister(pattern);
+ }
+
+ /**
+ * Extracts request path from the given {@link HttpRequest}
+ */
+ protected String getRequestPath(final HttpRequest request) {
+ String uriPath = request.getPath();
+ int index = uriPath.indexOf("?");
+ if (index != -1) {
+ uriPath = uriPath.substring(0, index);
+ } else {
+ index = uriPath.indexOf("#");
+ if (index != -1) {
+ uriPath = uriPath.substring(0, index);
+ }
+ }
+ return uriPath;
+ }
+
+ /**
+ * Looks up a handler matching the given request URI.
+ *
+ * @param request the request
+ * @return handler or {@code null} if no match is found.
+ */
+ @Override
+ public HttpRequestHandler lookup(final HttpRequest request, final HttpContext context) {
+ Args.notNull(request, "HTTP request");
+ return matcher.lookup(getRequestPath(request));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientExchangeHandler.java
index ec07515..19f875e 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientExchangeHandler.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientExchangeHandler.java
@@ -38,7 +38,7 @@ import org.apache.hc.core5.http.HttpResponse;
*
* @since 5.0
*/
-public interface AsyncClientExchangeHandler extends AsyncDataConsumer, AsyncDataProducer {
+public interface AsyncClientExchangeHandler extends AsyncDataExchangeHandler {
void produceRequest(RequestChannel channel) throws HttpException, IOException;
@@ -46,8 +46,6 @@ public interface AsyncClientExchangeHandler extends AsyncDataConsumer, AsyncData
void consumeInformation(HttpResponse response) throws HttpException, IOException;
- void failed(Exception cause);
-
void cancel();
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncDataExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncDataExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncDataExchangeHandler.java
new file mode 100644
index 0000000..3bc09e5
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncDataExchangeHandler.java
@@ -0,0 +1,39 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio;
+
+/**
+ * Abstract asynchronous data exchange handler that acts as a data consumer
+ * and a data producer.
+ *
+ * @since 5.0
+ */
+public interface AsyncDataExchangeHandler extends AsyncDataConsumer, AsyncDataProducer {
+
+ void failed(Exception cause);
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestProducer.java
index 1d64bd3..71d0e8f 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncRequestProducer.java
@@ -26,8 +26,9 @@
*/
package org.apache.hc.core5.http.nio;
-import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.HttpRequest;
+import java.io.IOException;
+
+import org.apache.hc.core5.http.HttpException;
/**
* Abstract asynchronous request producer.
@@ -36,9 +37,7 @@ import org.apache.hc.core5.http.HttpRequest;
*/
public interface AsyncRequestProducer extends AsyncDataProducer {
- HttpRequest produceRequest();
-
- EntityDetails getEntityDetails();
+ void sendRequest(RequestChannel requestChannel) throws HttpException, IOException;
void failed(Exception cause);
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseProducer.java
index 3b35b35..8796ae1 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncResponseProducer.java
@@ -26,8 +26,9 @@
*/
package org.apache.hc.core5.http.nio;
-import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.HttpResponse;
+import java.io.IOException;
+
+import org.apache.hc.core5.http.HttpException;
/**
* Abstract asynchronous response producer.
@@ -36,9 +37,7 @@ import org.apache.hc.core5.http.HttpResponse;
*/
public interface AsyncResponseProducer extends AsyncDataProducer {
- HttpResponse produceResponse();
-
- EntityDetails getEntityDetails();
+ void sendResponse(ResponseChannel channel) throws HttpException, IOException;
void failed(Exception cause);
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerExchangeHandler.java
index 4ec752c..4f0758e 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerExchangeHandler.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerExchangeHandler.java
@@ -38,13 +38,11 @@ import org.apache.hc.core5.http.HttpRequest;
*
* @since 5.0
*/
-public interface AsyncServerExchangeHandler extends AsyncDataConsumer, AsyncDataProducer {
+public interface AsyncServerExchangeHandler extends AsyncDataExchangeHandler {
void handleRequest(
HttpRequest request,
EntityDetails entityDetails,
ResponseChannel responseChannel) throws HttpException, IOException;
- void failed(Exception cause);
-
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerRequestHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerRequestHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerRequestHandler.java
new file mode 100644
index 0000000..91ed25a
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerRequestHandler.java
@@ -0,0 +1,47 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio;
+
+import java.io.IOException;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.protocol.HttpContext;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public interface AsyncServerRequestHandler<T> {
+
+ AsyncRequestConsumer<T> prepare(HttpRequest request, HttpContext context) throws HttpException;
+
+ void handle(T requestMessage, AsyncServerResponseTrigger responseTrigger, HttpContext context) throws HttpException, IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerResponseTrigger.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerResponseTrigger.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerResponseTrigger.java
new file mode 100644
index 0000000..db2e42a
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncServerResponseTrigger.java
@@ -0,0 +1,49 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio;
+
+import java.io.IOException;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public interface AsyncServerResponseTrigger {
+
+ void sendInformation(HttpResponse response) throws HttpException, IOException;
+
+ void submitResponse(AsyncResponseProducer responseProducer) throws HttpException, IOException;
+
+ void pushPromise(HttpRequest promise, AsyncPushProducer responseProducer) throws HttpException, IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestProducer.java
index 8e850c6..c07d979 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicRequestProducer.java
@@ -29,7 +29,7 @@ package org.apache.hc.core5.http.nio;
import java.io.IOException;
import java.net.URI;
-import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.message.BasicHttpRequest;
@@ -61,13 +61,8 @@ public class BasicRequestProducer implements AsyncRequestProducer {
}
@Override
- public HttpRequest produceRequest() {
- return request;
- }
-
- @Override
- public EntityDetails getEntityDetails() {
- return dataProducer;
+ public void sendRequest(final RequestChannel requestChannel) throws HttpException, IOException {
+ requestChannel.sendRequest(request, dataProducer);
}
@Override
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseProducer.java
index 055cf6b..a5cd1ab 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/BasicResponseProducer.java
@@ -28,10 +28,10 @@ package org.apache.hc.core5.http.nio;
import java.io.IOException;
-import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
-import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
import org.apache.hc.core5.util.Args;
@@ -79,13 +79,8 @@ public class BasicResponseProducer implements AsyncResponseProducer {
}
@Override
- public HttpResponse produceResponse() {
- return response;
- }
-
- @Override
- public EntityDetails getEntityDetails() {
- return dataProducer;
+ public void sendResponse(final ResponseChannel responseChannel) throws HttpException, IOException {
+ responseChannel.sendResponse(response, dataProducer);
}
@Override
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityConsumer.java
new file mode 100644
index 0000000..afd75ab
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityConsumer.java
@@ -0,0 +1,134 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.entity;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+public abstract class AbstractClassicEntityConsumer<T> implements AsyncEntityConsumer<T> {
+
+ private enum State { IDLE, ACTIVE, COMPLETED }
+
+ private final Executor executor;
+ private final SharedInputBuffer buffer;
+ private final AtomicReference<State> state;
+ private final AtomicReference<T> resultRef;
+ private final AtomicReference<Exception> exceptionRef;
+
+ public AbstractClassicEntityConsumer(final int initialBufferSize, final Executor executor) {
+ this.executor = Args.notNull(executor, "Executor");
+ this.buffer = new SharedInputBuffer(initialBufferSize);
+ this.state = new AtomicReference<>(State.IDLE);
+ this.resultRef = new AtomicReference<>(null);
+ this.exceptionRef = new AtomicReference<>(null);
+ }
+
+ protected abstract T consumeData(ContentType contentType, InputStream inputStream) throws IOException;
+
+ @Override
+ public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ buffer.updateCapacity(capacityChannel);
+ }
+
+ @Override
+ public final void streamStart(final EntityDetails entityDetails, final FutureCallback<T> resultCallback) throws HttpException, IOException {
+ final ContentType contentType;
+ try {
+ contentType = ContentType.parse(entityDetails.getContentType());
+ } catch (final UnsupportedCharsetException ex) {
+ throw new UnsupportedEncodingException(ex.getMessage());
+ }
+ if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ final T result = consumeData(contentType, new ContentInputStream(buffer));
+ resultRef.set(result);
+ resultCallback.completed(result);
+ } catch (final Exception ex) {
+ buffer.abort();
+ resultCallback.failed(ex);
+ } finally {
+ state.set(State.COMPLETED);
+ }
+ }
+
+ });
+ }
+ }
+
+ @Override
+ public final int consume(final ByteBuffer src) throws IOException {
+ return buffer.fill(src);
+ }
+
+ @Override
+ public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ buffer.markEndStream();
+ }
+
+ @Override
+ public final void failed(final Exception cause) {
+ if (exceptionRef.compareAndSet(null, cause)) {
+ releaseResources();
+ }
+ }
+
+ public final Exception getException() {
+ return exceptionRef.get();
+ }
+
+ @Override
+ public final T getContent() {
+ return resultRef.get();
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityProducer.java
new file mode 100644
index 0000000..5a4b19a
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityProducer.java
@@ -0,0 +1,130 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.entity;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+public abstract class AbstractClassicEntityProducer implements AsyncEntityProducer {
+
+ private enum State { IDLE, ACTIVE, COMPLETED }
+
+ private final SharedOutputBuffer buffer;
+ private final ContentType contentType;
+ private final Executor executor;
+ private final AtomicReference<State> state;
+ private final AtomicReference<Exception> exception;
+
+ public AbstractClassicEntityProducer(final int initialBufferSize, final ContentType contentType, final Executor executor) {
+ this.buffer = new SharedOutputBuffer(initialBufferSize);
+ this.contentType = contentType;
+ this.executor = Args.notNull(executor, "Executor");
+ this.state = new AtomicReference<>(State.IDLE);
+ this.exception = new AtomicReference<>(null);
+ }
+
+ protected abstract void produceData(ContentType contentType, OutputStream outputStream) throws IOException;
+
+ @Override
+ public final int available() {
+ return buffer.length();
+ }
+
+ @Override
+ public final void produce(final DataStreamChannel channel) throws IOException {
+ if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ produceData(contentType, new ContentOutputStream(buffer));
+ buffer.writeCompleted();
+ } catch (final Exception ex) {
+ buffer.abort();
+ } finally {
+ state.set(State.COMPLETED);
+ }
+ }
+
+ });
+ }
+ buffer.flush(channel);
+ }
+
+ @Override
+ public final long getContentLength() {
+ return -1;
+ }
+
+ @Override
+ public final String getContentType() {
+ return contentType != null ? contentType.toString() : null;
+ }
+
+ @Override
+ public String getContentEncoding() {
+ return null;
+ }
+
+ @Override
+ public final boolean isChunked() {
+ return false;
+ }
+
+ @Override
+ public final Set<String> getTrailerNames() {
+ return null;
+ }
+
+ @Override
+ public final void failed(final Exception cause) {
+ if (exception.compareAndSet(null, cause)) {
+ releaseResources();
+ }
+ }
+
+ public final Exception getException() {
+ return exception.get();
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractSharedBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractSharedBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractSharedBuffer.java
new file mode 100644
index 0000000..5339580
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractSharedBuffer.java
@@ -0,0 +1,119 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.entity;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.impl.nio.ExpandableBuffer;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+abstract class AbstractSharedBuffer extends ExpandableBuffer {
+
+ final ReentrantLock lock;
+ final Condition condition;
+
+ volatile boolean endStream;
+ volatile boolean aborted;
+
+ public AbstractSharedBuffer(final ReentrantLock lock, final int initialBufferSize) {
+ super(initialBufferSize);
+ this.lock = Args.notNull(lock, "Lock");
+ this.condition = lock.newCondition();
+ }
+
+ @Override
+ public boolean hasData() {
+ lock.lock();
+ try {
+ return super.hasData();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int capacity() {
+ lock.lock();
+ try {
+ return super.capacity();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int length() {
+ lock.lock();
+ try {
+ return super.length();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void abort() {
+ lock.lock();
+ try {
+ endStream = true;
+ aborted = true;
+ condition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void reset() {
+ if (aborted) {
+ return;
+ }
+ lock.lock();
+ try {
+ setInputMode();
+ buffer().clear();
+ endStream = false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean isEndStream() {
+ lock.lock();
+ try {
+ return endStream && !super.hasData();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedInputBuffer.java
new file mode 100644
index 0000000..7303f70
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedInputBuffer.java
@@ -0,0 +1,163 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.entity;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
+
+ private volatile CapacityChannel capacityChannel;
+
+ public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
+ super(lock, initialBufferSize);
+ }
+
+ public SharedInputBuffer(final int buffersize) {
+ super(new ReentrantLock(), buffersize);
+ }
+
+ public int fill(final ByteBuffer src) throws IOException {
+ lock.lock();
+ try {
+ setInputMode();
+ ensureCapacity(buffer().position() + src.remaining());
+ buffer().put(src);
+ final int remaining = buffer().remaining();
+ condition.signalAll();
+ return remaining;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ lock.lock();
+ try {
+ this.capacityChannel = capacityChannel;
+ setInputMode();
+ if (buffer().hasRemaining()) {
+ capacityChannel.update(buffer().remaining());
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void awaitInput() throws InterruptedIOException {
+ if (!buffer().hasRemaining()) {
+ setInputMode();
+ while (buffer().position() == 0 && !endStream && !aborted) {
+ try {
+ condition.await();
+ } catch (final InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException(ex.getMessage());
+ }
+ }
+ setOutputMode();
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ lock.lock();
+ try {
+ setOutputMode();
+ awaitInput();
+ if (aborted) {
+ return -1;
+ }
+ if (!buffer().hasRemaining() && endStream) {
+ return -1;
+ }
+ final int b = buffer().get() & 0xff;
+ if (!buffer().hasRemaining() && capacityChannel != null) {
+ setInputMode();
+ if (buffer().hasRemaining()) {
+ capacityChannel.update(buffer().remaining());
+ }
+ }
+ return b;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ lock.lock();
+ try {
+ setOutputMode();
+ awaitInput();
+ if (aborted) {
+ return -1;
+ }
+ if (!buffer().hasRemaining() && endStream) {
+ return -1;
+ }
+ final int chunk = Math.min(buffer().remaining(), len);
+ buffer().get(b, off, chunk);
+ if (!buffer().hasRemaining() && capacityChannel != null) {
+ setInputMode();
+ if (buffer().hasRemaining()) {
+ capacityChannel.update(buffer().remaining());
+ }
+ }
+ return chunk;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void markEndStream() throws IOException {
+ if (endStream) {
+ return;
+ }
+ lock.lock();
+ try {
+ if (!endStream) {
+ endStream = true;
+ capacityChannel = null;
+ condition.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedOutputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedOutputBuffer.java
new file mode 100644
index 0000000..f5da5f7
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedOutputBuffer.java
@@ -0,0 +1,165 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.entity;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer {
+
+ private volatile DataStreamChannel dataStreamChannel;
+ private volatile boolean hasCapacity;
+
+ public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) {
+ super(lock, initialBufferSize);
+ this.hasCapacity = false;
+ }
+
+ public SharedOutputBuffer(final int buffersize) {
+ this(new ReentrantLock(), buffersize);
+ }
+
+ public void flush(final DataStreamChannel channel) throws IOException {
+ lock.lock();
+ try {
+ dataStreamChannel = channel;
+ hasCapacity = true;
+ setOutputMode();
+ if (buffer().hasRemaining()) {
+ dataStreamChannel.write(buffer());
+ }
+ if (!buffer().hasRemaining() && endStream) {
+ dataStreamChannel.endStream();
+ }
+ condition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void ensureNotAborted() throws InterruptedIOException {
+ if (aborted) {
+ throw new InterruptedIOException("Operation aborted");
+ }
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ final ByteBuffer src = ByteBuffer.wrap(b, off, len);
+ lock.lock();
+ try {
+ ensureNotAborted();
+ setInputMode();
+ while (src.hasRemaining()) {
+ // always buffer small chunks
+ if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) {
+ buffer().put(src);
+ } else {
+ if (buffer().position() > 0 || dataStreamChannel == null) {
+ waitFlush();
+ }
+ if (buffer().position() == 0 && dataStreamChannel != null) {
+ final int bytesWritten = dataStreamChannel.write(src);
+ if (bytesWritten == 0) {
+ hasCapacity = false;
+ waitFlush();
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ lock.lock();
+ try {
+ ensureNotAborted();
+ setInputMode();
+ if (!buffer().hasRemaining()) {
+ waitFlush();
+ }
+ buffer().put((byte)b);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void writeCompleted() throws IOException {
+ if (endStream) {
+ return;
+ }
+ lock.lock();
+ try {
+ if (!endStream) {
+ endStream = true;
+ if (dataStreamChannel != null) {
+ setOutputMode();
+ if (buffer().hasRemaining()) {
+ dataStreamChannel.requestOutput();
+ } else {
+ dataStreamChannel.endStream();
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void waitFlush() throws InterruptedIOException {
+ setOutputMode();
+ if (dataStreamChannel != null) {
+ dataStreamChannel.requestOutput();
+ }
+ ensureNotAborted();
+ while (buffer().hasRemaining() || !hasCapacity) {
+ try {
+ condition.await();
+ } catch (final InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException(ex.getMessage());
+ }
+ ensureNotAborted();
+ }
+ setInputMode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/9f39bfdc/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClassicServerExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClassicServerExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClassicServerExchangeHandler.java
new file mode 100644
index 0000000..75a9101
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClassicServerExchangeHandler.java
@@ -0,0 +1,302 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.support;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.message.HttpResponseWrapper;
+import org.apache.hc.core5.http.nio.HttpContextAware;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.nio.entity.ContentInputStream;
+import org.apache.hc.core5.http.nio.entity.ContentOutputStream;
+import org.apache.hc.core5.http.nio.entity.SharedInputBuffer;
+import org.apache.hc.core5.http.nio.entity.SharedOutputBuffer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Asserts;
+
+/**
+ * @since 5.0
+ */
+public abstract class AbstractClassicServerExchangeHandler implements HttpContextAware, AsyncServerExchangeHandler {
+
+ private enum State { IDLE, ACTIVE, COMPLETED }
+
+ private final int initialBufferSize;
+ private final Executor executor;
+ private final AtomicReference<State> state;
+ private final AtomicReference<Exception> exception;
+
+ private volatile HttpContext context;
+ private volatile SharedInputBuffer inputBuffer;
+ private volatile SharedOutputBuffer outputBuffer;
+
+ public AbstractClassicServerExchangeHandler(final int initialBufferSize, final Executor executor) {
+ this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
+ this.executor = Args.notNull(executor, "Executor");
+ this.exception = new AtomicReference<>(null);
+ this.state = new AtomicReference<>(State.IDLE);
+ }
+
+ public Exception getException() {
+ return exception.get();
+ }
+
+ @Override
+ public void setContext(final HttpContext context) {
+ this.context = context;
+ }
+
+ protected abstract void handle(
+ HttpRequest request, InputStream requestStream,
+ HttpResponse response, OutputStream responseStream,
+ HttpContext context) throws IOException, HttpException;
+
+ @Override
+ public final void handleRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel) throws HttpException, IOException {
+
+ if (entityDetails != null) {
+ final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
+ if (h != null && "100-continue".equalsIgnoreCase(h.getValue())) {
+ responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE));
+ }
+ }
+ final AtomicBoolean responseCommitted = new AtomicBoolean(false);
+
+ final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
+ final HttpResponse responseWrapper = new HttpResponseWrapper(response){
+
+ private void ensureNotCommitted() {
+ Asserts.check(!responseCommitted.get(), "Response already committed");
+ }
+
+ @Override
+ public void addHeader(final String name, final Object value) {
+ ensureNotCommitted();
+ super.addHeader(name, value);
+ }
+
+ @Override
+ public void setHeader(final String name, final Object value) {
+ ensureNotCommitted();
+ super.setHeader(name, value);
+ }
+
+ @Override
+ public void setVersion(final ProtocolVersion version) {
+ ensureNotCommitted();
+ super.setVersion(version);
+ }
+
+ @Override
+ public void setCode(final int code) {
+ ensureNotCommitted();
+ super.setCode(code);
+ }
+
+ @Override
+ public void setReasonPhrase(final String reason) {
+ ensureNotCommitted();
+ super.setReasonPhrase(reason);
+ }
+
+ @Override
+ public void setLocale(final Locale locale) {
+ ensureNotCommitted();
+ super.setLocale(locale);
+ }
+
+ };
+
+ final InputStream inputStream;
+ if (entityDetails != null) {
+ inputBuffer = new SharedInputBuffer(initialBufferSize);
+ inputStream = new ContentInputStream(inputBuffer);
+ } else {
+ inputStream = null;
+ }
+ outputBuffer = new SharedOutputBuffer(initialBufferSize);
+
+ final OutputStream outputStream = new ContentOutputStream(outputBuffer) {
+
+ private void triggerResponse() throws IOException {
+ try {
+ if (responseCommitted.compareAndSet(false, true)) {
+ responseChannel.sendResponse(response, new EntityDetails() {
+
+ @Override
+ public long getContentLength() {
+ return -1;
+ }
+
+ @Override
+ public String getContentType() {
+ final Header h = response.getFirstHeader(HttpHeaders.CONTENT_TYPE);
+ return h != null ? h.getValue() : null;
+ }
+
+ @Override
+ public String getContentEncoding() {
+ final Header h = response.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
+ return h != null ? h.getValue() : null;
+ }
+
+ @Override
+ public boolean isChunked() {
+ return false;
+ }
+
+ @Override
+ public Set<String> getTrailerNames() {
+ return null;
+ }
+
+ });
+ }
+ } catch (final HttpException ex) {
+ throw new IOException(ex.getMessage(), ex);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ triggerResponse();
+ super.close();
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ triggerResponse();
+ super.write(b, off, len);
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ triggerResponse();
+ super.write(b);
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ triggerResponse();
+ super.write(b);
+ }
+
+ };
+
+ if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ handle(request, inputStream, responseWrapper, outputStream, context);
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ outputStream.close();
+ } catch (final Exception ex) {
+ exception.compareAndSet(null, ex);
+ if (inputBuffer != null) {
+ inputBuffer.abort();
+ }
+ outputBuffer.abort();
+ } finally {
+ state.set(State.COMPLETED);
+ }
+ }
+
+ });
+ }
+ }
+
+ @Override
+ public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ if (inputBuffer != null) {
+ inputBuffer.updateCapacity(capacityChannel);
+ }
+ }
+
+ @Override
+ public final int consume(final ByteBuffer src) throws IOException {
+ Asserts.notNull(inputBuffer, "Input buffer");
+ return inputBuffer.fill(src);
+ }
+
+ @Override
+ public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ Asserts.notNull(inputBuffer, "Input buffer");
+ inputBuffer.markEndStream();
+ }
+
+ @Override
+ public final int available() {
+ Asserts.notNull(outputBuffer, "Output buffer");
+ return outputBuffer.length();
+ }
+
+ @Override
+ public final void produce(final DataStreamChannel channel) throws IOException {
+ Asserts.notNull(outputBuffer, "Output buffer");
+ outputBuffer.flush(channel);
+ }
+
+ @Override
+ public final void failed(final Exception cause) {
+ exception.compareAndSet(null, cause);
+ releaseResources();
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+}