You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/08/15 17:48:36 UTC
[2/2] httpcomponents-core git commit: Moved classes (no functional
changes)
Moved classes (no functional changes)
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/cafa9bb4
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/cafa9bb4
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/cafa9bb4
Branch: refs/heads/master
Commit: cafa9bb4d1a90709d1d626878863f0a8d45623e1
Parents: fc6be3e
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Wed Aug 15 19:47:50 2018 +0200
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Wed Aug 15 19:47:50 2018 +0200
----------------------------------------------------------------------
.../impl/nio/entity/TestSharedInputBuffer.java | 244 ----------------
.../impl/nio/entity/TestSharedOutputBuffer.java | 232 ---------------
.../core5/testing/nio/Http1IntegrationTest.java | 6 +-
.../core5/testing/nio/Http2IntegrationTest.java | 6 +-
.../entity/AbstractClassicEntityConsumer.java | 134 ---------
.../entity/AbstractClassicEntityProducer.java | 135 ---------
.../http/nio/entity/AbstractSharedBuffer.java | 119 --------
.../http/nio/entity/ContentInputBuffer.java | 84 ------
.../http/nio/entity/ContentInputStream.java | 82 ------
.../http/nio/entity/ContentOutputBuffer.java | 81 ------
.../http/nio/entity/ContentOutputStream.java | 77 -----
.../http/nio/entity/SharedInputBuffer.java | 163 -----------
.../http/nio/entity/SharedOutputBuffer.java | 165 -----------
.../AbstractClassicServerExchangeHandler.java | 289 -------------------
.../classic/AbstractClassicEntityConsumer.java | 134 +++++++++
.../classic/AbstractClassicEntityProducer.java | 135 +++++++++
.../AbstractClassicServerExchangeHandler.java | 285 ++++++++++++++++++
.../support/classic/AbstractSharedBuffer.java | 119 ++++++++
.../nio/support/classic/ContentInputBuffer.java | 84 ++++++
.../nio/support/classic/ContentInputStream.java | 82 ++++++
.../support/classic/ContentOutputBuffer.java | 81 ++++++
.../support/classic/ContentOutputStream.java | 77 +++++
.../nio/support/classic/SharedInputBuffer.java | 163 +++++++++++
.../nio/support/classic/SharedOutputBuffer.java | 165 +++++++++++
.../hc/core5/http/WritableByteChannelMock.java | 8 +
.../support/classic/TestSharedInputBuffer.java | 243 ++++++++++++++++
.../support/classic/TestSharedOutputBuffer.java | 231 +++++++++++++++
27 files changed, 1813 insertions(+), 1811 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/entity/TestSharedInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/entity/TestSharedInputBuffer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/entity/TestSharedInputBuffer.java
deleted file mode 100644
index 7c259c7..0000000
--- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/entity/TestSharedInputBuffer.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.http2.impl.nio.entity;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hc.core5.http.nio.entity.SharedInputBuffer;
-import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mockito;
-
-public class TestSharedInputBuffer {
-
- @Test
- public void testBasis() throws Exception {
-
- final Charset charset = StandardCharsets.US_ASCII;
- final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
- inputBuffer.fill(charset.encode("1234567890"));
- Assert.assertEquals(10, inputBuffer.length());
-
- final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
-
- inputBuffer.updateCapacity(capacityChannel);
- Mockito.verifyZeroInteractions(capacityChannel);
-
- inputBuffer.fill(charset.encode("1234567890"));
- inputBuffer.fill(charset.encode("1234567890"));
- Assert.assertEquals(30, inputBuffer.length());
-
- Mockito.verifyZeroInteractions(capacityChannel);
-
- final byte[] tmp = new byte[20];
- final int bytesRead1 = inputBuffer.read(tmp, 0, tmp.length);
- Assert.assertEquals(20, bytesRead1);
- Mockito.verifyZeroInteractions(capacityChannel);
-
- inputBuffer.markEndStream();
-
- Assert.assertEquals('1', inputBuffer.read());
- Assert.assertEquals('2', inputBuffer.read());
- final int bytesRead2 = inputBuffer.read(tmp, 0, tmp.length);
- Assert.assertEquals(8, bytesRead2);
- Mockito.verifyZeroInteractions(capacityChannel);
- Assert.assertEquals(-1, inputBuffer.read(tmp, 0, tmp.length));
- Assert.assertEquals(-1, inputBuffer.read(tmp, 0, tmp.length));
- Assert.assertEquals(-1, inputBuffer.read());
- Assert.assertEquals(-1, inputBuffer.read());
- }
-
- @Test
- public void testMultithreadingRead() throws Exception {
-
- final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
-
- final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
-
- inputBuffer.updateCapacity(capacityChannel);
- Mockito.verify(capacityChannel).update(10);
- Mockito.reset(capacityChannel);
-
- final ExecutorService executorService = Executors.newFixedThreadPool(2);
- final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() throws Exception {
- final Charset charset = StandardCharsets.US_ASCII;
- inputBuffer.fill(charset.encode("1234567890"));
- return Boolean.TRUE;
- }
-
- });
- final Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
-
- @Override
- public Integer call() throws Exception {
- final byte[] tmp = new byte[20];
- return inputBuffer.read(tmp, 0, tmp.length);
- }
-
- });
-
- Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
- Assert.assertEquals(Integer.valueOf(10), task2.get(5, TimeUnit.SECONDS));
- Mockito.verify(capacityChannel).update(10);
- }
-
- @Test
- public void testMultithreadingSingleRead() throws Exception {
-
- final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
-
- final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
-
- inputBuffer.updateCapacity(capacityChannel);
- Mockito.verify(capacityChannel).update(10);
- Mockito.reset(capacityChannel);
-
- final ExecutorService executorService = Executors.newFixedThreadPool(2);
- final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() throws Exception {
- final Charset charset = StandardCharsets.US_ASCII;
- inputBuffer.fill(charset.encode("a"));
- return Boolean.TRUE;
- }
-
- });
- final Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
-
- @Override
- public Integer call() throws Exception {
- return inputBuffer.read();
- }
-
- });
-
- Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
- Assert.assertEquals(Integer.valueOf('a'), task2.get(5, TimeUnit.SECONDS));
- Mockito.verify(capacityChannel).update(10);
- }
-
- @Test
- public void testMultithreadingReadStream() throws Exception {
-
- final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
-
- final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
-
- inputBuffer.updateCapacity(capacityChannel);
- Mockito.verify(capacityChannel).update(10);
- Mockito.reset(capacityChannel);
-
- final ExecutorService executorService = Executors.newFixedThreadPool(2);
- final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() throws Exception {
- final Charset charset = StandardCharsets.US_ASCII;
- final Random rnd = new Random(System.currentTimeMillis());
- for (int i = 0; i < 5; i++) {
- inputBuffer.fill(charset.encode("1234567890"));
- Thread.sleep(rnd.nextInt(250));
- }
- inputBuffer.markEndStream();
- return Boolean.TRUE;
- }
-
- });
- final Future<String> task2 = executorService.submit(new Callable<String>() {
-
- @Override
- public String call() throws Exception {
- final Charset charset = StandardCharsets.US_ASCII;
- final StringBuilder buf = new StringBuilder();
- final byte[] tmp = new byte[10];
- int l;
- while ((l = inputBuffer.read(tmp, 0, tmp.length)) != -1) {
- buf.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
- }
- return buf.toString();
- }
-
- });
-
- Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
- Assert.assertEquals("12345678901234567890123456789012345678901234567890", task2.get(5, TimeUnit.SECONDS));
- Mockito.verify(capacityChannel, Mockito.atLeast(1)).update(ArgumentMatchers.anyInt());
- }
-
- @Test
- public void testMultithreadingReadStreamAbort() throws Exception {
-
- final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
-
- final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
-
- inputBuffer.updateCapacity(capacityChannel);
- Mockito.verify(capacityChannel).update(10);
- Mockito.reset(capacityChannel);
-
- final ExecutorService executorService = Executors.newFixedThreadPool(2);
- final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() throws Exception {
- Thread.sleep(1000);
- inputBuffer.abort();
- return Boolean.TRUE;
- }
-
- });
- final Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
-
- @Override
- public Integer call() throws Exception {
- return inputBuffer.read();
- }
-
- });
-
- Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
- Assert.assertEquals(Integer.valueOf(-1), task2.get(5, TimeUnit.SECONDS));
- Mockito.verify(capacityChannel, Mockito.never()).update(10);
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/entity/TestSharedOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/entity/TestSharedOutputBuffer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/entity/TestSharedOutputBuffer.java
deleted file mode 100644
index 86af685..0000000
--- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/entity/TestSharedOutputBuffer.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.http2.impl.nio.entity;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hc.core5.http.Header;
-import org.apache.hc.core5.http.nio.entity.SharedOutputBuffer;
-import org.apache.hc.core5.http.nio.DataStreamChannel;
-import org.apache.hc.core5.http2.WritableByteChannelMock;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class TestSharedOutputBuffer {
-
- static class DataStreamChannelMock implements DataStreamChannel {
-
- private final WritableByteChannelMock channel;
-
- DataStreamChannelMock(final WritableByteChannelMock channel) {
- this.channel = channel;
- }
-
- @Override
- public synchronized int write(final ByteBuffer src) throws IOException {
- return channel.write(src);
- }
-
- @Override
- public synchronized void requestOutput() {
- notifyAll();
- }
-
- @Override
- public synchronized void endStream(final List<? extends Header> trailers) throws IOException {
- channel.close();
- notifyAll();
- }
-
- @Override
- public void endStream() throws IOException {
- endStream(null);
- }
-
- public synchronized void awaitOutputRequest() throws InterruptedException {
- wait();
- }
-
- }
-
- @Test
- public void testBasis() throws Exception {
-
- final Charset charset = StandardCharsets.US_ASCII;
- final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
-
- final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
- final DataStreamChannel dataStreamChannel = Mockito.spy(new DataStreamChannelMock(channel));
- outputBuffer.flush(dataStreamChannel);
-
- Mockito.verifyZeroInteractions(dataStreamChannel);
-
- Assert.assertEquals(0, outputBuffer.length());
- Assert.assertEquals(30, outputBuffer.capacity());
-
- final byte[] tmp = "1234567890".getBytes(charset);
- outputBuffer.write(tmp, 0, tmp.length);
- outputBuffer.write(tmp, 0, tmp.length);
- outputBuffer.write('1');
- outputBuffer.write('2');
-
- Assert.assertEquals(22, outputBuffer.length());
- Assert.assertEquals(8, outputBuffer.capacity());
-
- Mockito.verifyZeroInteractions(dataStreamChannel);
- }
-
- @Test
- public void testFlush() throws Exception {
-
- final Charset charset = StandardCharsets.US_ASCII;
- final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
-
- final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
- final DataStreamChannel dataStreamChannel = new DataStreamChannelMock(channel);
- outputBuffer.flush(dataStreamChannel);
-
- Assert.assertEquals(0, outputBuffer.length());
- Assert.assertEquals(30, outputBuffer.capacity());
-
- final byte[] tmp = "1234567890".getBytes(charset);
- outputBuffer.write(tmp, 0, tmp.length);
- outputBuffer.write(tmp, 0, tmp.length);
- outputBuffer.write('1');
- outputBuffer.write('2');
-
- outputBuffer.flush(dataStreamChannel);
-
- Assert.assertEquals(0, outputBuffer.length());
- Assert.assertEquals(30, outputBuffer.capacity());
- }
-
- @Test
- public void testMultithreadingWriteStream() throws Exception {
-
- final Charset charset = StandardCharsets.US_ASCII;
- final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
-
- final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
- final DataStreamChannelMock dataStreamChannel = new DataStreamChannelMock(channel);
-
- final ExecutorService executorService = Executors.newFixedThreadPool(2);
- final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() throws Exception {
- final byte[] tmp = "1234567890".getBytes(charset);
- outputBuffer.write(tmp, 0, tmp.length);
- outputBuffer.write(tmp, 0, tmp.length);
- outputBuffer.write('1');
- outputBuffer.write('2');
- outputBuffer.write(tmp, 0, tmp.length);
- outputBuffer.write(tmp, 0, tmp.length);
- outputBuffer.write(tmp, 0, tmp.length);
- outputBuffer.writeCompleted();
- outputBuffer.writeCompleted();
- return Boolean.TRUE;
- }
-
- });
- final Future<Boolean> task2 = executorService.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() throws Exception {
- for (;;) {
- outputBuffer.flush(dataStreamChannel);
- if (outputBuffer.isEndStream()) {
- break;
- }
- if (!outputBuffer.hasData()) {
- dataStreamChannel.awaitOutputRequest();
- }
- }
- return Boolean.TRUE;
- }
-
- });
-
- Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
- Assert.assertEquals(Boolean.TRUE, task2.get(5, TimeUnit.SECONDS));
-
- Assert.assertEquals("1234567890123456789012123456789012345678901234567890", new String(channel.toByteArray(), charset));
- }
-
- @Test
- public void testMultithreadingWriteStreamAbort() throws Exception {
-
- final Charset charset = StandardCharsets.US_ASCII;
- final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
-
- final ExecutorService executorService = Executors.newFixedThreadPool(2);
- final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() throws Exception {
- final byte[] tmp = "1234567890".getBytes(charset);
- for (int i = 0; i < 20; i++) {
- outputBuffer.write(tmp, 0, tmp.length);
- }
- outputBuffer.writeCompleted();
- return Boolean.TRUE;
- }
-
- });
- final Future<Boolean> task2 = executorService.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() throws Exception {
- Thread.sleep(200);
- outputBuffer.abort();
- return Boolean.TRUE;
- }
-
- });
-
- Assert.assertEquals(Boolean.TRUE, task2.get(5, TimeUnit.SECONDS));
- try {
- task1.get(5, TimeUnit.SECONDS);
- } catch (final ExecutionException ex) {
- Assert.assertTrue(ex.getCause() instanceof InterruptedIOException);
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
index 565023a..2e885d6 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java
@@ -104,16 +104,16 @@ import org.apache.hc.core5.http.nio.NHttpMessageParser;
import org.apache.hc.core5.http.nio.NHttpMessageWriter;
import org.apache.hc.core5.http.nio.ResponseChannel;
import org.apache.hc.core5.http.nio.SessionOutputBuffer;
-import org.apache.hc.core5.http.nio.entity.AbstractClassicEntityConsumer;
-import org.apache.hc.core5.http.nio.entity.AbstractClassicEntityProducer;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
-import org.apache.hc.core5.http.nio.support.AbstractClassicServerExchangeHandler;
import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
+import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
+import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
+import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
index 6f5cf2d..454d14d 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2IntegrationTest.java
@@ -93,8 +93,6 @@ import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.ResponseChannel;
-import org.apache.hc.core5.http.nio.entity.AbstractClassicEntityConsumer;
-import org.apache.hc.core5.http.nio.entity.AbstractClassicEntityProducer;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
@@ -102,9 +100,11 @@ import org.apache.hc.core5.http.nio.entity.NoopEntityConsumer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
import org.apache.hc.core5.http.nio.support.AbstractAsyncPushHandler;
-import org.apache.hc.core5.http.nio.support.AbstractClassicServerExchangeHandler;
import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
+import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
+import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
+import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http2.H2Error;
import org.apache.hc.core5.http2.H2StreamResetException;
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityConsumer.java
deleted file mode 100644
index afd75ab..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityConsumer.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.nio.entity;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hc.core5.concurrent.FutureCallback;
-import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.Header;
-import org.apache.hc.core5.http.HttpException;
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
-import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.util.Args;
-
-/**
- * @since 5.0
- */
-public abstract class AbstractClassicEntityConsumer<T> implements AsyncEntityConsumer<T> {
-
- private enum State { IDLE, ACTIVE, COMPLETED }
-
- private final Executor executor;
- private final SharedInputBuffer buffer;
- private final AtomicReference<State> state;
- private final AtomicReference<T> resultRef;
- private final AtomicReference<Exception> exceptionRef;
-
- public AbstractClassicEntityConsumer(final int initialBufferSize, final Executor executor) {
- this.executor = Args.notNull(executor, "Executor");
- this.buffer = new SharedInputBuffer(initialBufferSize);
- this.state = new AtomicReference<>(State.IDLE);
- this.resultRef = new AtomicReference<>(null);
- this.exceptionRef = new AtomicReference<>(null);
- }
-
- protected abstract T consumeData(ContentType contentType, InputStream inputStream) throws IOException;
-
- @Override
- public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
- buffer.updateCapacity(capacityChannel);
- }
-
- @Override
- public final void streamStart(final EntityDetails entityDetails, final FutureCallback<T> resultCallback) throws HttpException, IOException {
- final ContentType contentType;
- try {
- contentType = ContentType.parse(entityDetails.getContentType());
- } catch (final UnsupportedCharsetException ex) {
- throw new UnsupportedEncodingException(ex.getMessage());
- }
- if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
- executor.execute(new Runnable() {
-
- @Override
- public void run() {
- try {
- final T result = consumeData(contentType, new ContentInputStream(buffer));
- resultRef.set(result);
- resultCallback.completed(result);
- } catch (final Exception ex) {
- buffer.abort();
- resultCallback.failed(ex);
- } finally {
- state.set(State.COMPLETED);
- }
- }
-
- });
- }
- }
-
- @Override
- public final int consume(final ByteBuffer src) throws IOException {
- return buffer.fill(src);
- }
-
- @Override
- public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
- buffer.markEndStream();
- }
-
- @Override
- public final void failed(final Exception cause) {
- if (exceptionRef.compareAndSet(null, cause)) {
- releaseResources();
- }
- }
-
- public final Exception getException() {
- return exceptionRef.get();
- }
-
- @Override
- public final T getContent() {
- return resultRef.get();
- }
-
- @Override
- public void releaseResources() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityProducer.java
deleted file mode 100644
index b582987..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractClassicEntityProducer.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.nio.entity;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.nio.AsyncEntityProducer;
-import org.apache.hc.core5.http.nio.DataStreamChannel;
-import org.apache.hc.core5.util.Args;
-
-/**
- * @since 5.0
- */
-public abstract class AbstractClassicEntityProducer implements AsyncEntityProducer {
-
- private enum State { IDLE, ACTIVE, COMPLETED }
-
- private final SharedOutputBuffer buffer;
- private final ContentType contentType;
- private final Executor executor;
- private final AtomicReference<State> state;
- private final AtomicReference<Exception> exception;
-
- public AbstractClassicEntityProducer(final int initialBufferSize, final ContentType contentType, final Executor executor) {
- this.buffer = new SharedOutputBuffer(initialBufferSize);
- this.contentType = contentType;
- this.executor = Args.notNull(executor, "Executor");
- this.state = new AtomicReference<>(State.IDLE);
- this.exception = new AtomicReference<>(null);
- }
-
- @Override
- public final boolean isRepeatable() {
- return false;
- }
-
- protected abstract void produceData(ContentType contentType, OutputStream outputStream) throws IOException;
-
- @Override
- public final int available() {
- return buffer.length();
- }
-
- @Override
- public final void produce(final DataStreamChannel channel) throws IOException {
- if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
- executor.execute(new Runnable() {
-
- @Override
- public void run() {
- try {
- produceData(contentType, new ContentOutputStream(buffer));
- buffer.writeCompleted();
- } catch (final Exception ex) {
- buffer.abort();
- } finally {
- state.set(State.COMPLETED);
- }
- }
-
- });
- }
- buffer.flush(channel);
- }
-
- @Override
- public final long getContentLength() {
- return -1;
- }
-
- @Override
- public final String getContentType() {
- return contentType != null ? contentType.toString() : null;
- }
-
- @Override
- public String getContentEncoding() {
- return null;
- }
-
- @Override
- public final boolean isChunked() {
- return false;
- }
-
- @Override
- public final Set<String> getTrailerNames() {
- return null;
- }
-
- @Override
- public final void failed(final Exception cause) {
- if (exception.compareAndSet(null, cause)) {
- releaseResources();
- }
- }
-
- public final Exception getException() {
- return exception.get();
- }
-
- @Override
- public void releaseResources() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractSharedBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractSharedBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractSharedBuffer.java
deleted file mode 100644
index 5339580..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractSharedBuffer.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.nio.entity;
-
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.impl.nio.ExpandableBuffer;
-import org.apache.hc.core5.util.Args;
-
-/**
- * @since 5.0
- */
-@Contract(threading = ThreadingBehavior.SAFE)
-abstract class AbstractSharedBuffer extends ExpandableBuffer {
-
- final ReentrantLock lock;
- final Condition condition;
-
- volatile boolean endStream;
- volatile boolean aborted;
-
- public AbstractSharedBuffer(final ReentrantLock lock, final int initialBufferSize) {
- super(initialBufferSize);
- this.lock = Args.notNull(lock, "Lock");
- this.condition = lock.newCondition();
- }
-
- @Override
- public boolean hasData() {
- lock.lock();
- try {
- return super.hasData();
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public int capacity() {
- lock.lock();
- try {
- return super.capacity();
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public int length() {
- lock.lock();
- try {
- return super.length();
- } finally {
- lock.unlock();
- }
- }
-
- public void abort() {
- lock.lock();
- try {
- endStream = true;
- aborted = true;
- condition.signalAll();
- } finally {
- lock.unlock();
- }
- }
-
- public void reset() {
- if (aborted) {
- return;
- }
- lock.lock();
- try {
- setInputMode();
- buffer().clear();
- endStream = false;
- } finally {
- lock.unlock();
- }
- }
-
- public boolean isEndStream() {
- lock.lock();
- try {
- return endStream && !super.hasData();
- } finally {
- lock.unlock();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentInputBuffer.java
deleted file mode 100644
index c730ab8..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentInputBuffer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.http.nio.entity;
-
-import java.io.IOException;
-
-/**
- * Generic content input buffer.
- *
- * @since 4.0
- */
-public interface ContentInputBuffer {
-
- /**
- * Return length data stored in the buffer
- *
- * @return data length
- */
- int length();
-
- /**
- * Resets the buffer by clearing its state and stored content.
- */
- void reset();
-
- /**
- * Reads up to {@code len} bytes of data from this buffer into
- * an array of bytes. The exact number of bytes read depends how many bytes
- * are stored in the buffer.
- *
- * <p> If {@code off} is negative, or {@code len} is negative, or
- * {@code off+len} is greater than the length of the array
- * {@code b}, this method can throw a runtime exception. The exact type
- * of runtime exception thrown by this method depends on implementation.
- * This method returns {@code -1} if the end of content stream has been
- * reached.
- *
- * @param b the buffer into which the data is read.
- * @param off the start offset in array {@code b}
- * at which the data is written.
- * @param len the maximum number of bytes to read.
- * @return the total number of bytes read into the buffer, or
- * {@code -1} if there is no more data because the end of
- * the stream has been reached.
- * @throws IOException if an I/O error occurs.
- */
- int read(byte[] b, int off, int len) throws IOException;
-
- /**
- * Reads one byte from this buffer. If the buffer is empty this method can
- * throw a runtime exception. The exact type of runtime exception thrown
- * by this method depends on implementation. This method returns
- * {@code -1} if the end of content stream has been reached.
- *
- * @return one byte
- */
- int read() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentInputStream.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentInputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentInputStream.java
deleted file mode 100644
index 8932d39..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentInputStream.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.http.nio.entity;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.hc.core5.util.Args;
-
-/**
- * {@link InputStream} adaptor for {@link ContentInputBuffer}.
- *
- * @since 4.0
- */
-public class ContentInputStream extends InputStream {
-
- private final ContentInputBuffer buffer;
-
- public ContentInputStream(final ContentInputBuffer buffer) {
- super();
- Args.notNull(buffer, "Input buffer");
- this.buffer = buffer;
- }
-
- @Override
- public int available() throws IOException {
- return this.buffer.length();
- }
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException {
- return this.buffer.read(b, off, len);
- }
-
- @Override
- public int read(final byte[] b) throws IOException {
- if (b == null) {
- return 0;
- }
- return this.buffer.read(b, 0, b.length);
- }
-
- @Override
- public int read() throws IOException {
- return this.buffer.read();
- }
-
- @Override
- public void close() throws IOException {
- // read and discard the remainder of the message
- final byte[] tmp = new byte[1024];
- while (this.buffer.read(tmp, 0, tmp.length) >= 0) {
- }
- super.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentOutputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentOutputBuffer.java
deleted file mode 100644
index 905f799..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentOutputBuffer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.http.nio.entity;
-
-import java.io.IOException;
-
-/**
- * Generic content output buffer.
- *
- * @since 4.0
- */
-public interface ContentOutputBuffer {
-
- /**
- * Return length data stored in the buffer
- *
- * @return data length
- */
- int length();
-
- /**
- * Resets the buffer by clearing its state and stored content.
- */
- void reset();
-
- /**
- * Writes {@code len} bytes from the specified byte array
- * starting at offset {@code off} to this buffer.
- * <p>
- * If {@code off} is negative, or {@code len} is negative, or
- * {@code off+len} is greater than the length of the array
- * {@code b}, this method can throw a runtime exception. The exact type
- * of runtime exception thrown by this method depends on implementation.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- * @throws IOException if an I/O error occurs.
- */
- void write(byte[] b, int off, int len) throws IOException;
-
- /**
- * Writes the specified byte to this buffer.
- *
- * @param b the {@code byte}.
- * @throws IOException if an I/O error occurs.
- */
- void write(int b) throws IOException;
-
- /**
- * Indicates the content has been fully written.
- * @throws IOException if an I/O error occurs.
- */
- void writeCompleted() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentOutputStream.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentOutputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentOutputStream.java
deleted file mode 100644
index 1b8b786..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/ContentOutputStream.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.http.nio.entity;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.hc.core5.util.Args;
-
-/**
- * {@link OutputStream} adaptor for {@link ContentOutputBuffer}.
- *
- * @since 4.0
- */
-public class ContentOutputStream extends OutputStream {
-
- private final ContentOutputBuffer buffer;
-
- public ContentOutputStream(final ContentOutputBuffer buffer) {
- super();
- Args.notNull(buffer, "Output buffer");
- this.buffer = buffer;
- }
-
- @Override
- public void close() throws IOException {
- this.buffer.writeCompleted();
- }
-
- @Override
- public void flush() throws IOException {
- }
-
- @Override
- public void write(final byte[] b, final int off, final int len) throws IOException {
- this.buffer.write(b, off, len);
- }
-
- @Override
- public void write(final byte[] b) throws IOException {
- if (b == null) {
- return;
- }
- this.buffer.write(b, 0, b.length);
- }
-
- @Override
- public void write(final int b) throws IOException {
- this.buffer.write(b);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedInputBuffer.java
deleted file mode 100644
index 24abeda..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedInputBuffer.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.nio.entity;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.nio.CapacityChannel;
-
-/**
- * @since 5.0
- */
-@Contract(threading = ThreadingBehavior.SAFE)
-public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
-
- private volatile CapacityChannel capacityChannel;
-
- public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
- super(lock, initialBufferSize);
- }
-
- public SharedInputBuffer(final int bufferSize) {
- super(new ReentrantLock(), bufferSize);
- }
-
- public int fill(final ByteBuffer src) throws IOException {
- lock.lock();
- try {
- setInputMode();
- ensureCapacity(buffer().position() + src.remaining());
- buffer().put(src);
- final int remaining = buffer().remaining();
- condition.signalAll();
- return remaining;
- } finally {
- lock.unlock();
- }
- }
-
- public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
- lock.lock();
- try {
- this.capacityChannel = capacityChannel;
- setInputMode();
- if (buffer().hasRemaining()) {
- capacityChannel.update(buffer().remaining());
- }
- } finally {
- lock.unlock();
- }
- }
-
- private void awaitInput() throws InterruptedIOException {
- if (!buffer().hasRemaining()) {
- setInputMode();
- while (buffer().position() == 0 && !endStream && !aborted) {
- try {
- condition.await();
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException(ex.getMessage());
- }
- }
- setOutputMode();
- }
- }
-
- @Override
- public int read() throws IOException {
- lock.lock();
- try {
- setOutputMode();
- awaitInput();
- if (aborted) {
- return -1;
- }
- if (!buffer().hasRemaining() && endStream) {
- return -1;
- }
- final int b = buffer().get() & 0xff;
- if (!buffer().hasRemaining() && capacityChannel != null) {
- setInputMode();
- if (buffer().hasRemaining()) {
- capacityChannel.update(buffer().remaining());
- }
- }
- return b;
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException {
- lock.lock();
- try {
- setOutputMode();
- awaitInput();
- if (aborted) {
- return -1;
- }
- if (!buffer().hasRemaining() && endStream) {
- return -1;
- }
- final int chunk = Math.min(buffer().remaining(), len);
- buffer().get(b, off, chunk);
- if (!buffer().hasRemaining() && capacityChannel != null) {
- setInputMode();
- if (buffer().hasRemaining()) {
- capacityChannel.update(buffer().remaining());
- }
- }
- return chunk;
- } finally {
- lock.unlock();
- }
- }
-
- public void markEndStream() throws IOException {
- if (endStream) {
- return;
- }
- lock.lock();
- try {
- if (!endStream) {
- endStream = true;
- capacityChannel = null;
- condition.signalAll();
- }
- } finally {
- lock.unlock();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedOutputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedOutputBuffer.java
deleted file mode 100644
index 701ad87..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/SharedOutputBuffer.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.nio.entity;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.nio.DataStreamChannel;
-
-/**
- * @since 5.0
- */
-@Contract(threading = ThreadingBehavior.SAFE)
-public final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer {
-
- private volatile DataStreamChannel dataStreamChannel;
- private volatile boolean hasCapacity;
-
- public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) {
- super(lock, initialBufferSize);
- this.hasCapacity = false;
- }
-
- public SharedOutputBuffer(final int bufferSize) {
- this(new ReentrantLock(), bufferSize);
- }
-
- public void flush(final DataStreamChannel channel) throws IOException {
- lock.lock();
- try {
- dataStreamChannel = channel;
- hasCapacity = true;
- setOutputMode();
- if (buffer().hasRemaining()) {
- dataStreamChannel.write(buffer());
- }
- if (!buffer().hasRemaining() && endStream) {
- dataStreamChannel.endStream();
- }
- condition.signalAll();
- } finally {
- lock.unlock();
- }
- }
-
- private void ensureNotAborted() throws InterruptedIOException {
- if (aborted) {
- throw new InterruptedIOException("Operation aborted");
- }
- }
-
- @Override
- public void write(final byte[] b, final int off, final int len) throws IOException {
- final ByteBuffer src = ByteBuffer.wrap(b, off, len);
- lock.lock();
- try {
- ensureNotAborted();
- setInputMode();
- while (src.hasRemaining()) {
- // always buffer small chunks
- if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) {
- buffer().put(src);
- } else {
- if (buffer().position() > 0 || dataStreamChannel == null) {
- waitFlush();
- }
- if (buffer().position() == 0 && dataStreamChannel != null) {
- final int bytesWritten = dataStreamChannel.write(src);
- if (bytesWritten == 0) {
- hasCapacity = false;
- waitFlush();
- }
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void write(final int b) throws IOException {
- lock.lock();
- try {
- ensureNotAborted();
- setInputMode();
- if (!buffer().hasRemaining()) {
- waitFlush();
- }
- buffer().put((byte)b);
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void writeCompleted() throws IOException {
- if (endStream) {
- return;
- }
- lock.lock();
- try {
- if (!endStream) {
- endStream = true;
- if (dataStreamChannel != null) {
- setOutputMode();
- if (buffer().hasRemaining()) {
- dataStreamChannel.requestOutput();
- } else {
- dataStreamChannel.endStream();
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- private void waitFlush() throws InterruptedIOException {
- setOutputMode();
- if (dataStreamChannel != null) {
- dataStreamChannel.requestOutput();
- }
- ensureNotAborted();
- while (buffer().hasRemaining() || !hasCapacity) {
- try {
- condition.await();
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException(ex.getMessage());
- }
- ensureNotAborted();
- }
- setInputMode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClassicServerExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClassicServerExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClassicServerExchangeHandler.java
deleted file mode 100644
index bd94886..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClassicServerExchangeHandler.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.nio.support;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Locale;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.Header;
-import org.apache.hc.core5.http.HttpException;
-import org.apache.hc.core5.http.HttpHeaders;
-import org.apache.hc.core5.http.HttpRequest;
-import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.HttpStatus;
-import org.apache.hc.core5.http.ProtocolVersion;
-import org.apache.hc.core5.http.message.BasicHttpResponse;
-import org.apache.hc.core5.http.message.HttpResponseWrapper;
-import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
-import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.http.nio.DataStreamChannel;
-import org.apache.hc.core5.http.nio.ResponseChannel;
-import org.apache.hc.core5.http.nio.entity.ContentInputStream;
-import org.apache.hc.core5.http.nio.entity.ContentOutputStream;
-import org.apache.hc.core5.http.nio.entity.SharedInputBuffer;
-import org.apache.hc.core5.http.nio.entity.SharedOutputBuffer;
-import org.apache.hc.core5.http.protocol.HttpContext;
-import org.apache.hc.core5.util.Args;
-import org.apache.hc.core5.util.Asserts;
-
-/**
- * @since 5.0
- */
-public abstract class AbstractClassicServerExchangeHandler implements AsyncServerExchangeHandler {
-
- private enum State { IDLE, ACTIVE, COMPLETED }
-
- private final int initialBufferSize;
- private final Executor executor;
- private final AtomicReference<State> state;
- private final AtomicReference<Exception> exception;
-
- private volatile SharedInputBuffer inputBuffer;
- private volatile SharedOutputBuffer outputBuffer;
-
- public AbstractClassicServerExchangeHandler(final int initialBufferSize, final Executor executor) {
- this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
- this.executor = Args.notNull(executor, "Executor");
- this.exception = new AtomicReference<>(null);
- this.state = new AtomicReference<>(State.IDLE);
- }
-
- public Exception getException() {
- return exception.get();
- }
-
- protected abstract void handle(
- HttpRequest request, InputStream requestStream,
- HttpResponse response, OutputStream responseStream,
- HttpContext context) throws IOException, HttpException;
-
- @Override
- public final void handleRequest(
- final HttpRequest request,
- final EntityDetails entityDetails,
- final ResponseChannel responseChannel,
- final HttpContext context) throws HttpException, IOException {
- final AtomicBoolean responseCommitted = new AtomicBoolean(false);
-
- final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
- final HttpResponse responseWrapper = new HttpResponseWrapper(response){
-
- private void ensureNotCommitted() {
- Asserts.check(!responseCommitted.get(), "Response already committed");
- }
-
- @Override
- public void addHeader(final String name, final Object value) {
- ensureNotCommitted();
- super.addHeader(name, value);
- }
-
- @Override
- public void setHeader(final String name, final Object value) {
- ensureNotCommitted();
- super.setHeader(name, value);
- }
-
- @Override
- public void setVersion(final ProtocolVersion version) {
- ensureNotCommitted();
- super.setVersion(version);
- }
-
- @Override
- public void setCode(final int code) {
- ensureNotCommitted();
- super.setCode(code);
- }
-
- @Override
- public void setReasonPhrase(final String reason) {
- ensureNotCommitted();
- super.setReasonPhrase(reason);
- }
-
- @Override
- public void setLocale(final Locale locale) {
- ensureNotCommitted();
- super.setLocale(locale);
- }
-
- };
-
- final InputStream inputStream;
- if (entityDetails != null) {
- inputBuffer = new SharedInputBuffer(initialBufferSize);
- inputStream = new ContentInputStream(inputBuffer);
- } else {
- inputStream = null;
- }
- outputBuffer = new SharedOutputBuffer(initialBufferSize);
-
- final OutputStream outputStream = new ContentOutputStream(outputBuffer) {
-
- private void triggerResponse() throws IOException {
- try {
- if (responseCommitted.compareAndSet(false, true)) {
- responseChannel.sendResponse(response, new EntityDetails() {
-
- @Override
- public long getContentLength() {
- return -1;
- }
-
- @Override
- public String getContentType() {
- final Header h = response.getFirstHeader(HttpHeaders.CONTENT_TYPE);
- return h != null ? h.getValue() : null;
- }
-
- @Override
- public String getContentEncoding() {
- final Header h = response.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
- return h != null ? h.getValue() : null;
- }
-
- @Override
- public boolean isChunked() {
- return false;
- }
-
- @Override
- public Set<String> getTrailerNames() {
- return null;
- }
-
- }, context);
- }
- } catch (final HttpException ex) {
- throw new IOException(ex.getMessage(), ex);
- }
- }
-
- @Override
- public void close() throws IOException {
- triggerResponse();
- super.close();
- }
-
- @Override
- public void write(final byte[] b, final int off, final int len) throws IOException {
- triggerResponse();
- super.write(b, off, len);
- }
-
- @Override
- public void write(final byte[] b) throws IOException {
- triggerResponse();
- super.write(b);
- }
-
- @Override
- public void write(final int b) throws IOException {
- triggerResponse();
- super.write(b);
- }
-
- };
-
- if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
- executor.execute(new Runnable() {
-
- @Override
- public void run() {
- try {
- handle(request, inputStream, responseWrapper, outputStream, context);
- if (inputStream != null) {
- inputStream.close();
- }
- outputStream.close();
- } catch (final Exception ex) {
- exception.compareAndSet(null, ex);
- if (inputBuffer != null) {
- inputBuffer.abort();
- }
- outputBuffer.abort();
- } finally {
- state.set(State.COMPLETED);
- }
- }
-
- });
- }
- }
-
- @Override
- public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
- if (inputBuffer != null) {
- inputBuffer.updateCapacity(capacityChannel);
- }
- }
-
- @Override
- public final int consume(final ByteBuffer src) throws IOException {
- Asserts.notNull(inputBuffer, "Input buffer");
- return inputBuffer.fill(src);
- }
-
- @Override
- public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
- Asserts.notNull(inputBuffer, "Input buffer");
- inputBuffer.markEndStream();
- }
-
- @Override
- public final int available() {
- Asserts.notNull(outputBuffer, "Output buffer");
- return outputBuffer.length();
- }
-
- @Override
- public final void produce(final DataStreamChannel channel) throws IOException {
- Asserts.notNull(outputBuffer, "Output buffer");
- outputBuffer.flush(channel);
- }
-
- @Override
- public final void failed(final Exception cause) {
- exception.compareAndSet(null, cause);
- releaseResources();
- }
-
- @Override
- public void releaseResources() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityConsumer.java
new file mode 100644
index 0000000..782b0d2
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityConsumer.java
@@ -0,0 +1,134 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+public abstract class AbstractClassicEntityConsumer<T> implements AsyncEntityConsumer<T> {
+
+ private enum State { IDLE, ACTIVE, COMPLETED }
+
+ private final Executor executor;
+ private final SharedInputBuffer buffer;
+ private final AtomicReference<State> state;
+ private final AtomicReference<T> resultRef;
+ private final AtomicReference<Exception> exceptionRef;
+
+ public AbstractClassicEntityConsumer(final int initialBufferSize, final Executor executor) {
+ this.executor = Args.notNull(executor, "Executor");
+ this.buffer = new SharedInputBuffer(initialBufferSize);
+ this.state = new AtomicReference<>(State.IDLE);
+ this.resultRef = new AtomicReference<>(null);
+ this.exceptionRef = new AtomicReference<>(null);
+ }
+
+ protected abstract T consumeData(ContentType contentType, InputStream inputStream) throws IOException;
+
+ @Override
+ public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ buffer.updateCapacity(capacityChannel);
+ }
+
+ @Override
+ public final void streamStart(final EntityDetails entityDetails, final FutureCallback<T> resultCallback) throws HttpException, IOException {
+ final ContentType contentType;
+ try {
+ contentType = ContentType.parse(entityDetails.getContentType());
+ } catch (final UnsupportedCharsetException ex) {
+ throw new UnsupportedEncodingException(ex.getMessage());
+ }
+ if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ final T result = consumeData(contentType, new ContentInputStream(buffer));
+ resultRef.set(result);
+ resultCallback.completed(result);
+ } catch (final Exception ex) {
+ buffer.abort();
+ resultCallback.failed(ex);
+ } finally {
+ state.set(State.COMPLETED);
+ }
+ }
+
+ });
+ }
+ }
+
+ @Override
+ public final int consume(final ByteBuffer src) throws IOException {
+ return buffer.fill(src);
+ }
+
+ @Override
+ public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ buffer.markEndStream();
+ }
+
+ @Override
+ public final void failed(final Exception cause) {
+ if (exceptionRef.compareAndSet(null, cause)) {
+ releaseResources();
+ }
+ }
+
+ public final Exception getException() {
+ return exceptionRef.get();
+ }
+
+ @Override
+ public final T getContent() {
+ return resultRef.get();
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityProducer.java
new file mode 100644
index 0000000..5ca791f
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicEntityProducer.java
@@ -0,0 +1,135 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+public abstract class AbstractClassicEntityProducer implements AsyncEntityProducer {
+
+ private enum State { IDLE, ACTIVE, COMPLETED }
+
+ private final SharedOutputBuffer buffer;
+ private final ContentType contentType;
+ private final Executor executor;
+ private final AtomicReference<State> state;
+ private final AtomicReference<Exception> exception;
+
+ public AbstractClassicEntityProducer(final int initialBufferSize, final ContentType contentType, final Executor executor) {
+ this.buffer = new SharedOutputBuffer(initialBufferSize);
+ this.contentType = contentType;
+ this.executor = Args.notNull(executor, "Executor");
+ this.state = new AtomicReference<>(State.IDLE);
+ this.exception = new AtomicReference<>(null);
+ }
+
+ @Override
+ public final boolean isRepeatable() {
+ return false;
+ }
+
+ protected abstract void produceData(ContentType contentType, OutputStream outputStream) throws IOException;
+
+ @Override
+ public final int available() {
+ return buffer.length();
+ }
+
+ @Override
+ public final void produce(final DataStreamChannel channel) throws IOException {
+ if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ produceData(contentType, new ContentOutputStream(buffer));
+ buffer.writeCompleted();
+ } catch (final Exception ex) {
+ buffer.abort();
+ } finally {
+ state.set(State.COMPLETED);
+ }
+ }
+
+ });
+ }
+ buffer.flush(channel);
+ }
+
+ @Override
+ public final long getContentLength() {
+ return -1;
+ }
+
+ @Override
+ public final String getContentType() {
+ return contentType != null ? contentType.toString() : null;
+ }
+
+ @Override
+ public String getContentEncoding() {
+ return null;
+ }
+
+ @Override
+ public final boolean isChunked() {
+ return false;
+ }
+
+ @Override
+ public final Set<String> getTrailerNames() {
+ return null;
+ }
+
+ @Override
+ public final void failed(final Exception cause) {
+ if (exception.compareAndSet(null, cause)) {
+ releaseResources();
+ }
+ }
+
+ public final Exception getException() {
+ return exception.get();
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+}