You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by wu...@apache.org on 2018/06/08 12:54:24 UTC
[incubator-servicecomb-java-chassis] 02/05: [SCB-484][WIP] enhance
pump, support auto close stream and pump to OutputStream
This is an automated email from the ASF dual-hosted git repository.
wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 13385d826f8bea93297a238e10b596e3042b603b
Author: wujimin <wu...@huawei.com>
AuthorDate: Tue Jun 5 12:14:07 2018 +0800
[SCB-484][WIP] enhance pump, support auto close stream and pump to OutputStream
---
.../foundation/common/io/AsyncCloseable.java | 23 +++
.../vertx/stream/InputStreamToReadStream.java | 40 +++--
.../vertx/stream/OutputStreamToWriteStream.java | 200 +++++++++++++++++++++
.../foundation/vertx/stream/PumpCommon.java | 83 +++++++++
.../foundation/vertx/stream/PumpFromPart.java | 107 +++++++++++
.../foundation/vertx/stream/TestPumpFromPart.java | 157 ++++++++++++++++
6 files changed, 600 insertions(+), 10 deletions(-)
diff --git a/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/io/AsyncCloseable.java b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/io/AsyncCloseable.java
new file mode 100644
index 0000000..a337565
--- /dev/null
+++ b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/io/AsyncCloseable.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.common.io;
+
+import java.util.concurrent.CompletableFuture;
+
+public interface AsyncCloseable<T> {
+ CompletableFuture<T> close();
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
index 2066dfe..01d6586 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
@@ -25,9 +25,9 @@ import org.slf4j.LoggerFactory;
import io.netty.buffer.Unpooled;
import io.vertx.core.AsyncResult;
+import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
@@ -36,7 +36,7 @@ public class InputStreamToReadStream implements ReadStream<Buffer> {
public static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;
- private Vertx vertx;
+ private Context context;
private InputStream inputStream;
@@ -54,14 +54,15 @@ public class InputStreamToReadStream implements ReadStream<Buffer> {
private Handler<Void> endHandler;
- public InputStreamToReadStream(Vertx vertx, InputStream inputStream) {
- this.vertx = vertx;
+ private boolean autoCloseInputStream;
+
+ public InputStreamToReadStream(Context context, InputStream inputStream,
+ boolean autoCloseInputStream) {
+ this.context = context;
this.inputStream = inputStream;
+ this.autoCloseInputStream = autoCloseInputStream;
}
- public InputStream getInputStream() {
- return inputStream;
- }
public synchronized InputStreamToReadStream readBufferSize(int readBufferSize) {
this.readBufferSize = readBufferSize;
@@ -113,8 +114,8 @@ public class InputStreamToReadStream implements ReadStream<Buffer> {
if (!readInProgress) {
readInProgress = true;
- vertx.executeBlocking(this::readInWorker,
- false,
+ context.executeBlocking(this::readInWorker,
+ true,
this::afterReadInEventloop);
}
}
@@ -129,9 +130,14 @@ public class InputStreamToReadStream implements ReadStream<Buffer> {
}
}
+ private void handleException(Throwable e) {
+ closeInputStream();
+ exceptionHandler.handle(e);
+ }
+
private synchronized void afterReadInEventloop(AsyncResult<ReadResult> ar) {
if (ar.failed()) {
- exceptionHandler.handle(ar.cause());
+ handleException(ar.cause());
return;
}
@@ -175,11 +181,25 @@ public class InputStreamToReadStream implements ReadStream<Buffer> {
private synchronized void handleEnd() {
dataHandler = null;
+ closeInputStream();
if (endHandler != null) {
endHandler.handle(null);
}
}
+ private void closeInputStream() {
+ closed = true;
+ if (!autoCloseInputStream) {
+ return;
+ }
+
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ LOGGER.error("failed to close inputSteam.", e);
+ }
+ }
+
@Override
public ReadStream<Buffer> endHandler(Handler<Void> handler) {
check();
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java
new file mode 100644
index 0000000..9e69759
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.servicecomb.foundation.common.io.AsyncCloseable;
+
+import io.vertx.core.Context;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.WriteStream;
+
+/**
+ * for pump from a readStream
+ */
+public class OutputStreamToWriteStream implements WriteStream<Buffer>, AsyncCloseable<Void> {
+ private static final int DEFAULT_MAX_BUFFERS = 4;
+
+ private static final int SMALLEST_MAX_BUFFERS = 2;
+
+ private OutputStream outputStream;
+
+ private Context context;
+
+ private boolean autoCloseOutputStream;
+
+ private Handler<Throwable> exceptionHandler;
+
+ // resume readStream
+ private Handler<Void> drainHandler;
+
+ // when invoke close, but outputStream not write all data, must put close logic to closedDeferred
+ private Runnable closedDeferred;
+
+ private boolean closed;
+
+ // buffers.size() need to loop all node, and maybe result is not correct in concurrent condition
+ // we just need to flow control by pump, so use another size
+ private Queue<Buffer> buffers = new ConcurrentLinkedQueue<>();
+
+ private int currentBufferCount;
+
+ // just indicate if buffers is full, not control add logic
+ // must >= SMALLEST_MAX_BUFFERS
+ // if < SMALLEST_MAX_BUFFERS, then maxBuffers will be SMALLEST_MAX_BUFFERS
+ private int maxBuffers = DEFAULT_MAX_BUFFERS;
+
+ // if currentBufferCount <= drainMark, will invoke drainHandler to resume readStream
+ private int drainMark = maxBuffers / 2;
+
+ public OutputStreamToWriteStream(Context context, OutputStream outputStream,
+ boolean autoCloseOutputStream) {
+ this.context = context;
+ this.outputStream = outputStream;
+ this.autoCloseOutputStream = autoCloseOutputStream;
+ }
+
+ @Override
+ public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
+ this.exceptionHandler = handler;
+ return this;
+ }
+
+ private void handleException(Throwable t) {
+ if (exceptionHandler != null) {
+ exceptionHandler.handle(t);
+ }
+ }
+
+ @Override
+ public synchronized WriteStream<Buffer> write(Buffer data) {
+ currentBufferCount++;
+ buffers.add(data);
+ context.executeBlocking(this::writeInWorker,
+ true,
+ ar -> {
+ if (ar.failed()) {
+ handleException(ar.cause());
+ }
+ });
+ return this;
+ }
+
+ protected void writeInWorker(Future<Object> future) {
+ while (true) {
+ Buffer buffer = buffers.poll();
+ if (buffer == null) {
+ future.complete();
+ return;
+ }
+
+ try {
+ outputStream.write(buffer.getBytes());
+
+ synchronized (OutputStreamToWriteStream.this) {
+ currentBufferCount--;
+ Runnable action = (currentBufferCount == 0 && closedDeferred != null) ? closedDeferred : this::checkDrained;
+ action.run();
+ }
+ } catch (IOException e) {
+ currentBufferCount--;
+ future.fail(e);
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void end() {
+ close();
+ }
+
+ @Override
+ public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
+ this.maxBuffers = maxSize < SMALLEST_MAX_BUFFERS ? SMALLEST_MAX_BUFFERS : maxSize;
+ this.drainMark = maxBuffers / 2;
+ return this;
+ }
+
+ @Override
+ public synchronized boolean writeQueueFull() {
+ return currentBufferCount >= maxBuffers;
+ }
+
+ @Override
+ public synchronized WriteStream<Buffer> drainHandler(Handler<Void> handler) {
+ this.drainHandler = handler;
+ return this;
+ }
+
+ private synchronized void checkDrained() {
+ if (drainHandler != null && currentBufferCount <= drainMark) {
+ Handler<Void> handler = drainHandler;
+ drainHandler = null;
+ handler.handle(null);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ return closeInternal();
+ }
+
+ private void check() {
+ checkClosed();
+ }
+
+ private void checkClosed() {
+ if (closed) {
+ throw new IllegalStateException(this.getClass().getName() + " is closed");
+ }
+ }
+
+ private synchronized CompletableFuture<Void> closeInternal() {
+ check();
+
+ closed = true;
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ if (currentBufferCount == 0) {
+ doClose(future);
+ } else {
+ closedDeferred = () -> doClose(future);
+ }
+ return future;
+ }
+
+ private void doClose(CompletableFuture<Void> future) {
+ if (autoCloseOutputStream) {
+ try {
+ outputStream.close();
+ } catch (IOException e) {
+ future.completeExceptionally(new IllegalStateException("failed to close outputStream.", e));
+ return;
+ }
+ }
+
+ future.complete(null);
+ }
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
new file mode 100644
index 0000000..156510a
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.servicecomb.foundation.common.io.AsyncCloseable;
+
+import io.vertx.core.Context;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.Pump;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+
+public class PumpCommon {
+ /**
+ *
+ * @param context
+ * @param readStream
+ * @param writeStream
+ * @return future of save action<br>
+ * <p>important:
+ * <p> if writeStream is AsyncCloseable, future means write complete
+ * <p> if writeStream is not AsyncCloseable, future only means read complete
+ */
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Void> pump(Context context, ReadStream<Buffer> readStream, WriteStream<Buffer> writeStream) {
+ CompletableFuture<Void> readFuture = new CompletableFuture<>();
+
+ writeStream.exceptionHandler(readFuture::completeExceptionally);
+ readStream.exceptionHandler(readFuture::completeExceptionally);
+ // just means read finished, not means write finished
+ readStream.endHandler(readFuture::complete);
+
+ // if readStream(HttpClientResponse) and awriteStream(HttpServerResponse)
+ // belongs to difference eventloop
+ // maybe will cause deadlock
+ // if happened, vertx will print deadlock stacks
+ Pump.pump(readStream, writeStream).start();
+ try {
+ context.runOnContext(v -> readStream.resume());
+ } catch (Throwable e) {
+ readFuture.completeExceptionally(e);
+ }
+
+ if (!AsyncCloseable.class.isInstance(writeStream)) {
+ return readFuture;
+ }
+
+ return closeWriteStream((AsyncCloseable<Void>) writeStream, readFuture);
+ }
+
+ protected CompletableFuture<Void> closeWriteStream(AsyncCloseable<Void> writeStream,
+ CompletableFuture<Void> readFuture) {
+ CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+ readFuture.whenComplete((v, e) ->
+ writeStream.close().whenComplete((wv, we) -> {
+ if (we != null) {
+ writeFuture.completeExceptionally(we);
+ return;
+ }
+
+ writeFuture.complete(null);
+ })
+ );
+
+ return CompletableFuture.allOf(readFuture, writeFuture);
+ }
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java
new file mode 100644
index 0000000..348321c
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+
+import javax.servlet.http.Part;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.servicecomb.foundation.vertx.http.DownloadUtils;
+import org.apache.servicecomb.foundation.vertx.http.ReadStreamPart;
+
+import io.vertx.core.Context;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+
+public class PumpFromPart {
+ private Context context;
+
+ private Part part;
+
+ public PumpFromPart(Context context, Part part) {
+ this.context = context;
+ this.part = part;
+ }
+
+ private CompletableFuture<ReadStream<Buffer>> prepareReadStream() {
+ CompletableFuture<ReadStream<Buffer>> future = new CompletableFuture<>();
+
+ if (ReadStreamPart.class.isInstance(part)) {
+ future.complete(((ReadStreamPart) part).getReadStream());
+ return future;
+ }
+
+ try {
+ InputStream inputStream = part.getInputStream();
+ InputStreamToReadStream inputStreamToReadStream = new InputStreamToReadStream(context, inputStream, true);
+ inputStreamToReadStream.pause();
+ future.complete(inputStreamToReadStream);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+
+ return future;
+ }
+
+ public CompletableFuture<Void> toWriteStream(WriteStream<Buffer> writeStream) {
+ return prepareReadStream()
+ .thenCompose(readStream -> new PumpCommon().pump(context, readStream, writeStream))
+ .whenComplete((v, e) -> DownloadUtils.clearPartResource(part));
+ }
+
+ public CompletableFuture<Void> toOutputStream(OutputStream outputStream, boolean autoCloseOutputStream) {
+ if (context == null) {
+ return toOutputStreamSync(outputStream, autoCloseOutputStream);
+ }
+
+ return toOutputStreamAsync(outputStream, autoCloseOutputStream);
+ }
+
+ private CompletableFuture<Void> toOutputStreamAsync(OutputStream outputStream, boolean autoCloseOutputStream) {
+ OutputStreamToWriteStream outputStreamToWriteStream = new OutputStreamToWriteStream(context, outputStream,
+ autoCloseOutputStream);
+ return toWriteStream(outputStreamToWriteStream);
+ }
+
+ // DO NOT use a mocked sync context to unify the pump logic
+ // otherwise when pump big stream, will cause stack overflow
+ private CompletableFuture<Void> toOutputStreamSync(OutputStream outputStream, boolean autoCloseOutputStream) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ try (InputStream inputStream = part.getInputStream()) {
+ IOUtils.copyLarge(inputStream, outputStream);
+ } catch (Throwable e) {
+ future.completeExceptionally(e);
+ }
+
+ if (autoCloseOutputStream) {
+ try {
+ outputStream.close();
+ } catch (Throwable e) {
+ future.completeExceptionally(e);
+ }
+ }
+
+ future.complete(null);
+ return future;
+ }
+}
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
new file mode 100644
index 0000000..96899e8
--- /dev/null
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.ExecutionException;
+
+import javax.servlet.http.Part;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.servicecomb.foundation.common.part.InputStreamPart;
+import org.apache.servicecomb.foundation.vertx.stream.InputStreamToReadStream.ReadResult;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.vertx.core.Context;
+import io.vertx.core.Future;
+import io.vertx.core.impl.SyncContext;
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+
+public class TestPumpFromPart {
+ String src = RandomStringUtils.random(100);
+
+ boolean inputStreamClosed;
+
+ InputStream inputStream = new ByteArrayInputStream(src.getBytes()) {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ inputStreamClosed = true;
+ }
+ };
+
+ Part part;
+
+ boolean outputStreamClosed;
+
+ BufferOutputStream outputStream;
+
+ IOException error = new IOException();
+
+ Context context = new SyncContext();
+
+ private void run(Context context, boolean closeOutput) throws Throwable {
+ inputStream.reset();
+ part = new InputStreamPart("name", inputStream);
+
+ outputStream = new BufferOutputStream() {
+ @Override
+ public void close() {
+ super.close();
+ outputStreamClosed = true;
+ }
+ };
+
+ new PumpFromPart(context, part).toOutputStream(outputStream, closeOutput).get();
+ }
+
+ public void do_pump_succ(Context context) throws Throwable {
+ run(context, true);
+
+ Assert.assertEquals(src, outputStream.getBuffer().toString());
+ Assert.assertTrue(inputStreamClosed);
+ Assert.assertTrue(outputStreamClosed);
+ }
+
+ @Test
+ public void pump_succ() throws Throwable {
+ do_pump_succ(null);
+ do_pump_succ(context);
+ }
+
+ public void do_pump_outputNotClose(Context context) throws Throwable {
+ run(context, false);
+
+ Assert.assertEquals(src, outputStream.getBuffer().toString());
+ Assert.assertFalse(outputStreamClosed);
+ }
+
+ @Test
+ public void pump_outputNotClose() throws Throwable {
+ do_pump_outputNotClose(null);
+ do_pump_outputNotClose(context);
+ }
+
+ public void pump_error(Context context) {
+ try {
+ run(context, true);
+ Assert.fail("must throw exception");
+ } catch (Throwable e) {
+ Assert.assertThat(e, Matchers.instanceOf(ExecutionException.class));
+ Assert.assertThat(e.getCause(), Matchers.sameInstance(error));
+ }
+
+ Assert.assertTrue(inputStreamClosed);
+ Assert.assertTrue(outputStreamClosed);
+ }
+
+ @Test
+ public void pump_read_error() throws IOException {
+ new MockUp<InputStreamToReadStream>() {
+ @Mock
+ void readInWorker(Future<ReadResult> future) {
+ future.fail(error);
+ }
+ };
+ new Expectations(IOUtils.class) {
+ {
+ IOUtils.copyLarge((InputStream) any, (OutputStream) any);
+ result = error;
+ }
+ };
+
+ pump_error(null);
+ pump_error(context);
+ }
+
+ @Test
+ public void pump_write_error() throws IOException {
+ new MockUp<BufferOutputStream>() {
+ @Mock
+ void write(byte[] b) throws IOException {
+ throw error;
+ }
+ };
+ new Expectations(IOUtils.class) {
+ {
+ IOUtils.copyLarge((InputStream) any, (OutputStream) any);
+ result = error;
+ }
+ };
+
+ pump_error(null);
+ pump_error(context);
+ }
+}
--
To stop receiving notification emails like this one, please contact
wujimin@apache.org.