You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/08/21 15:01:37 UTC
[26/32] httpcomponents-core git commit: Moved classes (no functional
changes)
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicServerExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicServerExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicServerExchangeHandler.java
new file mode 100644
index 0000000..78ae034
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicServerExchangeHandler.java
@@ -0,0 +1,285 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.message.HttpResponseWrapper;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Asserts;
+
+/**
+ * @since 5.0
+ */
+public abstract class AbstractClassicServerExchangeHandler implements AsyncServerExchangeHandler {
+
+ private enum State { IDLE, ACTIVE, COMPLETED }
+
+ private final int initialBufferSize;
+ private final Executor executor;
+ private final AtomicReference<State> state;
+ private final AtomicReference<Exception> exception;
+
+ private volatile SharedInputBuffer inputBuffer;
+ private volatile SharedOutputBuffer outputBuffer;
+
+ public AbstractClassicServerExchangeHandler(final int initialBufferSize, final Executor executor) {
+ this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
+ this.executor = Args.notNull(executor, "Executor");
+ this.exception = new AtomicReference<>(null);
+ this.state = new AtomicReference<>(State.IDLE);
+ }
+
+ public Exception getException() {
+ return exception.get();
+ }
+
+ protected abstract void handle(
+ HttpRequest request, InputStream requestStream,
+ HttpResponse response, OutputStream responseStream,
+ HttpContext context) throws IOException, HttpException;
+
+ @Override
+ public final void handleRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel,
+ final HttpContext context) throws HttpException, IOException {
+ final AtomicBoolean responseCommitted = new AtomicBoolean(false);
+
+ final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
+ final HttpResponse responseWrapper = new HttpResponseWrapper(response){
+
+ private void ensureNotCommitted() {
+ Asserts.check(!responseCommitted.get(), "Response already committed");
+ }
+
+ @Override
+ public void addHeader(final String name, final Object value) {
+ ensureNotCommitted();
+ super.addHeader(name, value);
+ }
+
+ @Override
+ public void setHeader(final String name, final Object value) {
+ ensureNotCommitted();
+ super.setHeader(name, value);
+ }
+
+ @Override
+ public void setVersion(final ProtocolVersion version) {
+ ensureNotCommitted();
+ super.setVersion(version);
+ }
+
+ @Override
+ public void setCode(final int code) {
+ ensureNotCommitted();
+ super.setCode(code);
+ }
+
+ @Override
+ public void setReasonPhrase(final String reason) {
+ ensureNotCommitted();
+ super.setReasonPhrase(reason);
+ }
+
+ @Override
+ public void setLocale(final Locale locale) {
+ ensureNotCommitted();
+ super.setLocale(locale);
+ }
+
+ };
+
+ final InputStream inputStream;
+ if (entityDetails != null) {
+ inputBuffer = new SharedInputBuffer(initialBufferSize);
+ inputStream = new ContentInputStream(inputBuffer);
+ } else {
+ inputStream = null;
+ }
+ outputBuffer = new SharedOutputBuffer(initialBufferSize);
+
+ final OutputStream outputStream = new ContentOutputStream(outputBuffer) {
+
+ private void triggerResponse() throws IOException {
+ try {
+ if (responseCommitted.compareAndSet(false, true)) {
+ responseChannel.sendResponse(response, new EntityDetails() {
+
+ @Override
+ public long getContentLength() {
+ return -1;
+ }
+
+ @Override
+ public String getContentType() {
+ final Header h = response.getFirstHeader(HttpHeaders.CONTENT_TYPE);
+ return h != null ? h.getValue() : null;
+ }
+
+ @Override
+ public String getContentEncoding() {
+ final Header h = response.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
+ return h != null ? h.getValue() : null;
+ }
+
+ @Override
+ public boolean isChunked() {
+ return false;
+ }
+
+ @Override
+ public Set<String> getTrailerNames() {
+ return null;
+ }
+
+ }, context);
+ }
+ } catch (final HttpException ex) {
+ throw new IOException(ex.getMessage(), ex);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ triggerResponse();
+ super.close();
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ triggerResponse();
+ super.write(b, off, len);
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ triggerResponse();
+ super.write(b);
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ triggerResponse();
+ super.write(b);
+ }
+
+ };
+
+ if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ handle(request, inputStream, responseWrapper, outputStream, context);
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ outputStream.close();
+ } catch (final Exception ex) {
+ exception.compareAndSet(null, ex);
+ if (inputBuffer != null) {
+ inputBuffer.abort();
+ }
+ outputBuffer.abort();
+ } finally {
+ state.set(State.COMPLETED);
+ }
+ }
+
+ });
+ }
+ }
+
+ @Override
+ public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ if (inputBuffer != null) {
+ inputBuffer.updateCapacity(capacityChannel);
+ }
+ }
+
+ @Override
+ public final int consume(final ByteBuffer src) throws IOException {
+ Asserts.notNull(inputBuffer, "Input buffer");
+ return inputBuffer.fill(src);
+ }
+
+ @Override
+ public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ Asserts.notNull(inputBuffer, "Input buffer");
+ inputBuffer.markEndStream();
+ }
+
+ @Override
+ public final int available() {
+ Asserts.notNull(outputBuffer, "Output buffer");
+ return outputBuffer.length();
+ }
+
+ @Override
+ public final void produce(final DataStreamChannel channel) throws IOException {
+ Asserts.notNull(outputBuffer, "Output buffer");
+ outputBuffer.flush(channel);
+ }
+
+ @Override
+ public final void failed(final Exception cause) {
+ exception.compareAndSet(null, cause);
+ releaseResources();
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractSharedBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractSharedBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractSharedBuffer.java
new file mode 100644
index 0000000..c6473e6
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractSharedBuffer.java
@@ -0,0 +1,119 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.impl.nio.ExpandableBuffer;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+abstract class AbstractSharedBuffer extends ExpandableBuffer {
+
+ final ReentrantLock lock;
+ final Condition condition;
+
+ volatile boolean endStream;
+ volatile boolean aborted;
+
+ public AbstractSharedBuffer(final ReentrantLock lock, final int initialBufferSize) {
+ super(initialBufferSize);
+ this.lock = Args.notNull(lock, "Lock");
+ this.condition = lock.newCondition();
+ }
+
+ @Override
+ public boolean hasData() {
+ lock.lock();
+ try {
+ return super.hasData();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int capacity() {
+ lock.lock();
+ try {
+ return super.capacity();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int length() {
+ lock.lock();
+ try {
+ return super.length();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void abort() {
+ lock.lock();
+ try {
+ endStream = true;
+ aborted = true;
+ condition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void reset() {
+ if (aborted) {
+ return;
+ }
+ lock.lock();
+ try {
+ setInputMode();
+ buffer().clear();
+ endStream = false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean isEndStream() {
+ lock.lock();
+ try {
+ return endStream && !super.hasData();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputBuffer.java
new file mode 100644
index 0000000..2871d5e
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputBuffer.java
@@ -0,0 +1,84 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+
+/**
+ * Generic content input buffer.
+ *
+ * @since 4.0
+ */
+public interface ContentInputBuffer {
+
+ /**
+ * Return length data stored in the buffer
+ *
+ * @return data length
+ */
+ int length();
+
+ /**
+ * Resets the buffer by clearing its state and stored content.
+ */
+ void reset();
+
+ /**
+ * Reads up to {@code len} bytes of data from this buffer into
+ * an array of bytes. The exact number of bytes read depends how many bytes
+ * are stored in the buffer.
+ *
+ * <p> If {@code off} is negative, or {@code len} is negative, or
+ * {@code off+len} is greater than the length of the array
+ * {@code b}, this method can throw a runtime exception. The exact type
+ * of runtime exception thrown by this method depends on implementation.
+ * This method returns {@code -1} if the end of content stream has been
+ * reached.
+ *
+ * @param b the buffer into which the data is read.
+ * @param off the start offset in array {@code b}
+ * at which the data is written.
+ * @param len the maximum number of bytes to read.
+ * @return the total number of bytes read into the buffer, or
+ * {@code -1} if there is no more data because the end of
+ * the stream has been reached.
+ * @throws IOException if an I/O error occurs.
+ */
+ int read(byte[] b, int off, int len) throws IOException;
+
+ /**
+ * Reads one byte from this buffer. If the buffer is empty this method can
+ * throw a runtime exception. The exact type of runtime exception thrown
+ * by this method depends on implementation. This method returns
+ * {@code -1} if the end of content stream has been reached.
+ *
+ * @return one byte
+ */
+ int read() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputStream.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputStream.java
new file mode 100644
index 0000000..f3e567b
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputStream.java
@@ -0,0 +1,82 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hc.core5.util.Args;
+
+/**
+ * {@link InputStream} adaptor for {@link ContentInputBuffer}.
+ *
+ * @since 4.0
+ */
+public class ContentInputStream extends InputStream {
+
+ private final ContentInputBuffer buffer;
+
+ public ContentInputStream(final ContentInputBuffer buffer) {
+ super();
+ Args.notNull(buffer, "Input buffer");
+ this.buffer = buffer;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return this.buffer.length();
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ return this.buffer.read(b, off, len);
+ }
+
+ @Override
+ public int read(final byte[] b) throws IOException {
+ if (b == null) {
+ return 0;
+ }
+ return this.buffer.read(b, 0, b.length);
+ }
+
+ @Override
+ public int read() throws IOException {
+ return this.buffer.read();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // read and discard the remainder of the message
+ final byte[] tmp = new byte[1024];
+ while (this.buffer.read(tmp, 0, tmp.length) >= 0) {
+ }
+ super.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputBuffer.java
new file mode 100644
index 0000000..a9f645e
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputBuffer.java
@@ -0,0 +1,81 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+
+/**
+ * Generic content output buffer.
+ *
+ * @since 4.0
+ */
+public interface ContentOutputBuffer {
+
+ /**
+ * Return length data stored in the buffer
+ *
+ * @return data length
+ */
+ int length();
+
+ /**
+ * Resets the buffer by clearing its state and stored content.
+ */
+ void reset();
+
+ /**
+ * Writes {@code len} bytes from the specified byte array
+ * starting at offset {@code off} to this buffer.
+ * <p>
+ * If {@code off} is negative, or {@code len} is negative, or
+ * {@code off+len} is greater than the length of the array
+ * {@code b}, this method can throw a runtime exception. The exact type
+ * of runtime exception thrown by this method depends on implementation.
+ *
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ * @throws IOException if an I/O error occurs.
+ */
+ void write(byte[] b, int off, int len) throws IOException;
+
+ /**
+ * Writes the specified byte to this buffer.
+ *
+ * @param b the {@code byte}.
+ * @throws IOException if an I/O error occurs.
+ */
+ void write(int b) throws IOException;
+
+ /**
+ * Indicates the content has been fully written.
+ * @throws IOException if an I/O error occurs.
+ */
+ void writeCompleted() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputStream.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputStream.java
new file mode 100644
index 0000000..b01e99d
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputStream.java
@@ -0,0 +1,77 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hc.core5.util.Args;
+
+/**
+ * {@link OutputStream} adaptor for {@link ContentOutputBuffer}.
+ *
+ * @since 4.0
+ */
+public class ContentOutputStream extends OutputStream {
+
+ private final ContentOutputBuffer buffer;
+
+ public ContentOutputStream(final ContentOutputBuffer buffer) {
+ super();
+ Args.notNull(buffer, "Output buffer");
+ this.buffer = buffer;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.buffer.writeCompleted();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ this.buffer.write(b, off, len);
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ if (b == null) {
+ return;
+ }
+ this.buffer.write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ this.buffer.write(b);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java
new file mode 100644
index 0000000..d4497fd
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java
@@ -0,0 +1,163 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
+
+ private volatile CapacityChannel capacityChannel;
+
+ public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
+ super(lock, initialBufferSize);
+ }
+
+ public SharedInputBuffer(final int bufferSize) {
+ super(new ReentrantLock(), bufferSize);
+ }
+
+ public int fill(final ByteBuffer src) throws IOException {
+ lock.lock();
+ try {
+ setInputMode();
+ ensureCapacity(buffer().position() + src.remaining());
+ buffer().put(src);
+ final int remaining = buffer().remaining();
+ condition.signalAll();
+ return remaining;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ lock.lock();
+ try {
+ this.capacityChannel = capacityChannel;
+ setInputMode();
+ if (buffer().hasRemaining()) {
+ capacityChannel.update(buffer().remaining());
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void awaitInput() throws InterruptedIOException {
+ if (!buffer().hasRemaining()) {
+ setInputMode();
+ while (buffer().position() == 0 && !endStream && !aborted) {
+ try {
+ condition.await();
+ } catch (final InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException(ex.getMessage());
+ }
+ }
+ setOutputMode();
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ lock.lock();
+ try {
+ setOutputMode();
+ awaitInput();
+ if (aborted) {
+ return -1;
+ }
+ if (!buffer().hasRemaining() && endStream) {
+ return -1;
+ }
+ final int b = buffer().get() & 0xff;
+ if (!buffer().hasRemaining() && capacityChannel != null) {
+ setInputMode();
+ if (buffer().hasRemaining()) {
+ capacityChannel.update(buffer().remaining());
+ }
+ }
+ return b;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ lock.lock();
+ try {
+ setOutputMode();
+ awaitInput();
+ if (aborted) {
+ return -1;
+ }
+ if (!buffer().hasRemaining() && endStream) {
+ return -1;
+ }
+ final int chunk = Math.min(buffer().remaining(), len);
+ buffer().get(b, off, chunk);
+ if (!buffer().hasRemaining() && capacityChannel != null) {
+ setInputMode();
+ if (buffer().hasRemaining()) {
+ capacityChannel.update(buffer().remaining());
+ }
+ }
+ return chunk;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void markEndStream() throws IOException {
+ if (endStream) {
+ return;
+ }
+ lock.lock();
+ try {
+ if (!endStream) {
+ endStream = true;
+ capacityChannel = null;
+ condition.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedOutputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedOutputBuffer.java
new file mode 100644
index 0000000..ee494f4
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedOutputBuffer.java
@@ -0,0 +1,165 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer {
+
+ private volatile DataStreamChannel dataStreamChannel;
+ private volatile boolean hasCapacity;
+
+ public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) {
+ super(lock, initialBufferSize);
+ this.hasCapacity = false;
+ }
+
+ public SharedOutputBuffer(final int bufferSize) {
+ this(new ReentrantLock(), bufferSize);
+ }
+
+ public void flush(final DataStreamChannel channel) throws IOException {
+ lock.lock();
+ try {
+ dataStreamChannel = channel;
+ hasCapacity = true;
+ setOutputMode();
+ if (buffer().hasRemaining()) {
+ dataStreamChannel.write(buffer());
+ }
+ if (!buffer().hasRemaining() && endStream) {
+ dataStreamChannel.endStream();
+ }
+ condition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void ensureNotAborted() throws InterruptedIOException {
+ if (aborted) {
+ throw new InterruptedIOException("Operation aborted");
+ }
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ final ByteBuffer src = ByteBuffer.wrap(b, off, len);
+ lock.lock();
+ try {
+ ensureNotAborted();
+ setInputMode();
+ while (src.hasRemaining()) {
+ // always buffer small chunks
+ if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) {
+ buffer().put(src);
+ } else {
+ if (buffer().position() > 0 || dataStreamChannel == null) {
+ waitFlush();
+ }
+ if (buffer().position() == 0 && dataStreamChannel != null) {
+ final int bytesWritten = dataStreamChannel.write(src);
+ if (bytesWritten == 0) {
+ hasCapacity = false;
+ waitFlush();
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ lock.lock();
+ try {
+ ensureNotAborted();
+ setInputMode();
+ if (!buffer().hasRemaining()) {
+ waitFlush();
+ }
+ buffer().put((byte)b);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void writeCompleted() throws IOException {
+ if (endStream) {
+ return;
+ }
+ lock.lock();
+ try {
+ if (!endStream) {
+ endStream = true;
+ if (dataStreamChannel != null) {
+ setOutputMode();
+ if (buffer().hasRemaining()) {
+ dataStreamChannel.requestOutput();
+ } else {
+ dataStreamChannel.endStream();
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void waitFlush() throws InterruptedIOException {
+ setOutputMode();
+ if (dataStreamChannel != null) {
+ dataStreamChannel.requestOutput();
+ }
+ ensureNotAborted();
+ while (buffer().hasRemaining() || !hasCapacity) {
+ try {
+ condition.await();
+ } catch (final InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException(ex.getMessage());
+ }
+ ensureNotAborted();
+ }
+ setInputMode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/test/java/org/apache/hc/core5/http/WritableByteChannelMock.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/WritableByteChannelMock.java b/httpcore5/src/test/java/org/apache/hc/core5/http/WritableByteChannelMock.java
index 99b001c..0137607 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/http/WritableByteChannelMock.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/WritableByteChannelMock.java
@@ -107,6 +107,14 @@ public class WritableByteChannelMock implements WritableByteChannel {
this.buf.clear();
}
+ public byte[] toByteArray() {
+ final ByteBuffer dup = this.buf.duplicate();
+ dup.flip();
+ final byte[] bytes = new byte[dup.remaining()];
+ dup.get(bytes);
+ return bytes;
+ }
+
public String dump(final Charset charset) throws CharacterCodingException {
this.buf.flip();
final CharBuffer charBuffer = charset.newDecoder().decode(this.buf);
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedInputBuffer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedInputBuffer.java
new file mode 100644
index 0000000..df1c9d3
--- /dev/null
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedInputBuffer.java
@@ -0,0 +1,243 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+public class TestSharedInputBuffer {
+
+ @Test
+ public void testBasis() throws Exception {
+
+ final Charset charset = StandardCharsets.US_ASCII;
+ final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
+ inputBuffer.fill(charset.encode("1234567890"));
+ Assert.assertEquals(10, inputBuffer.length());
+
+ final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
+
+ inputBuffer.updateCapacity(capacityChannel);
+ Mockito.verifyZeroInteractions(capacityChannel);
+
+ inputBuffer.fill(charset.encode("1234567890"));
+ inputBuffer.fill(charset.encode("1234567890"));
+ Assert.assertEquals(30, inputBuffer.length());
+
+ Mockito.verifyZeroInteractions(capacityChannel);
+
+ final byte[] tmp = new byte[20];
+ final int bytesRead1 = inputBuffer.read(tmp, 0, tmp.length);
+ Assert.assertEquals(20, bytesRead1);
+ Mockito.verifyZeroInteractions(capacityChannel);
+
+ inputBuffer.markEndStream();
+
+ Assert.assertEquals('1', inputBuffer.read());
+ Assert.assertEquals('2', inputBuffer.read());
+ final int bytesRead2 = inputBuffer.read(tmp, 0, tmp.length);
+ Assert.assertEquals(8, bytesRead2);
+ Mockito.verifyZeroInteractions(capacityChannel);
+ Assert.assertEquals(-1, inputBuffer.read(tmp, 0, tmp.length));
+ Assert.assertEquals(-1, inputBuffer.read(tmp, 0, tmp.length));
+ Assert.assertEquals(-1, inputBuffer.read());
+ Assert.assertEquals(-1, inputBuffer.read());
+ }
+
+ @Test
+ public void testMultithreadingRead() throws Exception {
+
+ final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
+
+ final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
+
+ inputBuffer.updateCapacity(capacityChannel);
+ Mockito.verify(capacityChannel).update(10);
+ Mockito.reset(capacityChannel);
+
+ final ExecutorService executorService = Executors.newFixedThreadPool(2);
+ final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ final Charset charset = StandardCharsets.US_ASCII;
+ inputBuffer.fill(charset.encode("1234567890"));
+ return Boolean.TRUE;
+ }
+
+ });
+ final Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ final byte[] tmp = new byte[20];
+ return inputBuffer.read(tmp, 0, tmp.length);
+ }
+
+ });
+
+ Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
+ Assert.assertEquals(Integer.valueOf(10), task2.get(5, TimeUnit.SECONDS));
+ Mockito.verify(capacityChannel).update(10);
+ }
+
+ @Test
+ public void testMultithreadingSingleRead() throws Exception {
+
+ final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
+
+ final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
+
+ inputBuffer.updateCapacity(capacityChannel);
+ Mockito.verify(capacityChannel).update(10);
+ Mockito.reset(capacityChannel);
+
+ final ExecutorService executorService = Executors.newFixedThreadPool(2);
+ final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ final Charset charset = StandardCharsets.US_ASCII;
+ inputBuffer.fill(charset.encode("a"));
+ return Boolean.TRUE;
+ }
+
+ });
+ final Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ return inputBuffer.read();
+ }
+
+ });
+
+ Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
+ Assert.assertEquals(Integer.valueOf('a'), task2.get(5, TimeUnit.SECONDS));
+ Mockito.verify(capacityChannel).update(10);
+ }
+
+ @Test
+ public void testMultithreadingReadStream() throws Exception {
+
+ final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
+
+ final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
+
+ inputBuffer.updateCapacity(capacityChannel);
+ Mockito.verify(capacityChannel).update(10);
+ Mockito.reset(capacityChannel);
+
+ final ExecutorService executorService = Executors.newFixedThreadPool(2);
+ final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ final Charset charset = StandardCharsets.US_ASCII;
+ final Random rnd = new Random(System.currentTimeMillis());
+ for (int i = 0; i < 5; i++) {
+ inputBuffer.fill(charset.encode("1234567890"));
+ Thread.sleep(rnd.nextInt(250));
+ }
+ inputBuffer.markEndStream();
+ return Boolean.TRUE;
+ }
+
+ });
+ final Future<String> task2 = executorService.submit(new Callable<String>() {
+
+ @Override
+ public String call() throws Exception {
+ final Charset charset = StandardCharsets.US_ASCII;
+ final StringBuilder buf = new StringBuilder();
+ final byte[] tmp = new byte[10];
+ int l;
+ while ((l = inputBuffer.read(tmp, 0, tmp.length)) != -1) {
+ buf.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
+ }
+ return buf.toString();
+ }
+
+ });
+
+ Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
+ Assert.assertEquals("12345678901234567890123456789012345678901234567890", task2.get(5, TimeUnit.SECONDS));
+ Mockito.verify(capacityChannel, Mockito.atLeast(1)).update(ArgumentMatchers.anyInt());
+ }
+
+ @Test
+ public void testMultithreadingReadStreamAbort() throws Exception {
+
+ final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
+
+ final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
+
+ inputBuffer.updateCapacity(capacityChannel);
+ Mockito.verify(capacityChannel).update(10);
+ Mockito.reset(capacityChannel);
+
+ final ExecutorService executorService = Executors.newFixedThreadPool(2);
+ final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ Thread.sleep(1000);
+ inputBuffer.abort();
+ return Boolean.TRUE;
+ }
+
+ });
+ final Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ return inputBuffer.read();
+ }
+
+ });
+
+ Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
+ Assert.assertEquals(Integer.valueOf(-1), task2.get(5, TimeUnit.SECONDS));
+ Mockito.verify(capacityChannel, Mockito.never()).update(10);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedOutputBuffer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedOutputBuffer.java
new file mode 100644
index 0000000..278d1fd
--- /dev/null
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedOutputBuffer.java
@@ -0,0 +1,231 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.WritableByteChannelMock;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestSharedOutputBuffer {
+
+ static class DataStreamChannelMock implements DataStreamChannel {
+
+ private final WritableByteChannelMock channel;
+
+ DataStreamChannelMock(final WritableByteChannelMock channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public synchronized int write(final ByteBuffer src) throws IOException {
+ return channel.write(src);
+ }
+
+ @Override
+ public synchronized void requestOutput() {
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void endStream(final List<? extends Header> trailers) throws IOException {
+ channel.close();
+ notifyAll();
+ }
+
+ @Override
+ public void endStream() throws IOException {
+ endStream(null);
+ }
+
+ public synchronized void awaitOutputRequest() throws InterruptedException {
+ wait();
+ }
+
+ }
+
+ @Test
+ public void testBasis() throws Exception {
+
+ final Charset charset = StandardCharsets.US_ASCII;
+ final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
+
+ final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
+ final DataStreamChannel dataStreamChannel = Mockito.spy(new DataStreamChannelMock(channel));
+ outputBuffer.flush(dataStreamChannel);
+
+ Mockito.verifyZeroInteractions(dataStreamChannel);
+
+ Assert.assertEquals(0, outputBuffer.length());
+ Assert.assertEquals(30, outputBuffer.capacity());
+
+ final byte[] tmp = "1234567890".getBytes(charset);
+ outputBuffer.write(tmp, 0, tmp.length);
+ outputBuffer.write(tmp, 0, tmp.length);
+ outputBuffer.write('1');
+ outputBuffer.write('2');
+
+ Assert.assertEquals(22, outputBuffer.length());
+ Assert.assertEquals(8, outputBuffer.capacity());
+
+ Mockito.verifyZeroInteractions(dataStreamChannel);
+ }
+
+ @Test
+ public void testFlush() throws Exception {
+
+ final Charset charset = StandardCharsets.US_ASCII;
+ final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
+
+ final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
+ final DataStreamChannel dataStreamChannel = new DataStreamChannelMock(channel);
+ outputBuffer.flush(dataStreamChannel);
+
+ Assert.assertEquals(0, outputBuffer.length());
+ Assert.assertEquals(30, outputBuffer.capacity());
+
+ final byte[] tmp = "1234567890".getBytes(charset);
+ outputBuffer.write(tmp, 0, tmp.length);
+ outputBuffer.write(tmp, 0, tmp.length);
+ outputBuffer.write('1');
+ outputBuffer.write('2');
+
+ outputBuffer.flush(dataStreamChannel);
+
+ Assert.assertEquals(0, outputBuffer.length());
+ Assert.assertEquals(30, outputBuffer.capacity());
+ }
+
+ @Test
+ public void testMultithreadingWriteStream() throws Exception {
+
+ final Charset charset = StandardCharsets.US_ASCII;
+ final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
+
+ final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
+ final DataStreamChannelMock dataStreamChannel = new DataStreamChannelMock(channel);
+
+ final ExecutorService executorService = Executors.newFixedThreadPool(2);
+ final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ final byte[] tmp = "1234567890".getBytes(charset);
+ outputBuffer.write(tmp, 0, tmp.length);
+ outputBuffer.write(tmp, 0, tmp.length);
+ outputBuffer.write('1');
+ outputBuffer.write('2');
+ outputBuffer.write(tmp, 0, tmp.length);
+ outputBuffer.write(tmp, 0, tmp.length);
+ outputBuffer.write(tmp, 0, tmp.length);
+ outputBuffer.writeCompleted();
+ outputBuffer.writeCompleted();
+ return Boolean.TRUE;
+ }
+
+ });
+ final Future<Boolean> task2 = executorService.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ for (;;) {
+ outputBuffer.flush(dataStreamChannel);
+ if (outputBuffer.isEndStream()) {
+ break;
+ }
+ if (!outputBuffer.hasData()) {
+ dataStreamChannel.awaitOutputRequest();
+ }
+ }
+ return Boolean.TRUE;
+ }
+
+ });
+
+ Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
+ Assert.assertEquals(Boolean.TRUE, task2.get(5, TimeUnit.SECONDS));
+
+ Assert.assertEquals("1234567890123456789012123456789012345678901234567890", new String(channel.toByteArray(), charset));
+ }
+
+ @Test
+ public void testMultithreadingWriteStreamAbort() throws Exception {
+
+ final Charset charset = StandardCharsets.US_ASCII;
+ final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
+
+ final ExecutorService executorService = Executors.newFixedThreadPool(2);
+ final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ final byte[] tmp = "1234567890".getBytes(charset);
+ for (int i = 0; i < 20; i++) {
+ outputBuffer.write(tmp, 0, tmp.length);
+ }
+ outputBuffer.writeCompleted();
+ return Boolean.TRUE;
+ }
+
+ });
+ final Future<Boolean> task2 = executorService.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ Thread.sleep(200);
+ outputBuffer.abort();
+ return Boolean.TRUE;
+ }
+
+ });
+
+ Assert.assertEquals(Boolean.TRUE, task2.get(5, TimeUnit.SECONDS));
+ try {
+ task1.get(5, TimeUnit.SECONDS);
+ } catch (final ExecutionException ex) {
+ Assert.assertTrue(ex.getCause() instanceof InterruptedIOException);
+ }
+ }
+
+}
+