You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by wu...@apache.org on 2018/06/08 12:54:24 UTC

[incubator-servicecomb-java-chassis] 02/05: [SCB-484][WIP] enhance pump, support auto close stream and pump to OutputStream

This is an automated email from the ASF dual-hosted git repository.

wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit 13385d826f8bea93297a238e10b596e3042b603b
Author: wujimin <wu...@huawei.com>
AuthorDate: Tue Jun 5 12:14:07 2018 +0800

    [SCB-484][WIP] enhance pump, support auto close stream and pump to OutputStream
---
 .../foundation/common/io/AsyncCloseable.java       |  23 +++
 .../vertx/stream/InputStreamToReadStream.java      |  40 +++--
 .../vertx/stream/OutputStreamToWriteStream.java    | 200 +++++++++++++++++++++
 .../foundation/vertx/stream/PumpCommon.java        |  83 +++++++++
 .../foundation/vertx/stream/PumpFromPart.java      | 107 +++++++++++
 .../foundation/vertx/stream/TestPumpFromPart.java  | 157 ++++++++++++++++
 6 files changed, 600 insertions(+), 10 deletions(-)

diff --git a/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/io/AsyncCloseable.java b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/io/AsyncCloseable.java
new file mode 100644
index 0000000..a337565
--- /dev/null
+++ b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/io/AsyncCloseable.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.common.io;
+
+import java.util.concurrent.CompletableFuture;
+
+public interface AsyncCloseable<T> {
+  CompletableFuture<T> close();
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
index 2066dfe..01d6586 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
@@ -25,9 +25,9 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.Unpooled;
 import io.vertx.core.AsyncResult;
+import io.vertx.core.Context;
 import io.vertx.core.Future;
 import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.streams.ReadStream;
 
@@ -36,7 +36,7 @@ public class InputStreamToReadStream implements ReadStream<Buffer> {
 
   public static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;
 
-  private Vertx vertx;
+  private Context context;
 
   private InputStream inputStream;
 
@@ -54,14 +54,15 @@ public class InputStreamToReadStream implements ReadStream<Buffer> {
 
   private Handler<Void> endHandler;
 
-  public InputStreamToReadStream(Vertx vertx, InputStream inputStream) {
-    this.vertx = vertx;
+  private boolean autoCloseInputStream;
+
+  public InputStreamToReadStream(Context context, InputStream inputStream,
+      boolean autoCloseInputStream) {
+    this.context = context;
     this.inputStream = inputStream;
+    this.autoCloseInputStream = autoCloseInputStream;
   }
 
-  public InputStream getInputStream() {
-    return inputStream;
-  }
 
   public synchronized InputStreamToReadStream readBufferSize(int readBufferSize) {
     this.readBufferSize = readBufferSize;
@@ -113,8 +114,8 @@ public class InputStreamToReadStream implements ReadStream<Buffer> {
     if (!readInProgress) {
       readInProgress = true;
 
-      vertx.executeBlocking(this::readInWorker,
-          false,
+      context.executeBlocking(this::readInWorker,
+          true,
           this::afterReadInEventloop);
     }
   }
@@ -129,9 +130,14 @@ public class InputStreamToReadStream implements ReadStream<Buffer> {
     }
   }
 
+  private void handleException(Throwable e) {
+    closeInputStream();
+    exceptionHandler.handle(e);
+  }
+
   private synchronized void afterReadInEventloop(AsyncResult<ReadResult> ar) {
     if (ar.failed()) {
-      exceptionHandler.handle(ar.cause());
+      handleException(ar.cause());
       return;
     }
 
@@ -175,11 +181,25 @@ public class InputStreamToReadStream implements ReadStream<Buffer> {
 
   private synchronized void handleEnd() {
     dataHandler = null;
+    closeInputStream();
     if (endHandler != null) {
       endHandler.handle(null);
     }
   }
 
+  private void closeInputStream() {
+    closed = true;
+    if (!autoCloseInputStream) {
+      return;
+    }
+
+    try {
+      inputStream.close();
+    } catch (IOException e) {
+      LOGGER.error("failed to close inputSteam.", e);
+    }
+  }
+
   @Override
   public ReadStream<Buffer> endHandler(Handler<Void> handler) {
     check();
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java
new file mode 100644
index 0000000..9e69759
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.servicecomb.foundation.common.io.AsyncCloseable;
+
+import io.vertx.core.Context;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.WriteStream;
+
+/**
+ * for pump from a readStream
+ */
+public class OutputStreamToWriteStream implements WriteStream<Buffer>, AsyncCloseable<Void> {
+  private static final int DEFAULT_MAX_BUFFERS = 4;
+
+  private static final int SMALLEST_MAX_BUFFERS = 2;
+
+  private OutputStream outputStream;
+
+  private Context context;
+
+  private boolean autoCloseOutputStream;
+
+  private Handler<Throwable> exceptionHandler;
+
+  // resume readStream
+  private Handler<Void> drainHandler;
+
+  // when invoke close, but outputStream not write all data, must put close logic to closedDeferred
+  private Runnable closedDeferred;
+
+  private boolean closed;
+
+  // buffers.size() need to loop all node, and maybe result is not correct in concurrent condition
+  // we just need to flow control by pump, so use another size
+  private Queue<Buffer> buffers = new ConcurrentLinkedQueue<>();
+
+  private int currentBufferCount;
+
+  // just indicate if buffers is full, not control add logic
+  // must >= SMALLEST_MAX_BUFFERS
+  // if < SMALLEST_MAX_BUFFERS, then maxBuffers will be SMALLEST_MAX_BUFFERS
+  private int maxBuffers = DEFAULT_MAX_BUFFERS;
+
+  // if currentBufferCount <= drainMark, will invoke drainHandler to resume readStream
+  private int drainMark = maxBuffers / 2;
+
+  public OutputStreamToWriteStream(Context context, OutputStream outputStream,
+      boolean autoCloseOutputStream) {
+    this.context = context;
+    this.outputStream = outputStream;
+    this.autoCloseOutputStream = autoCloseOutputStream;
+  }
+
+  @Override
+  public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
+    this.exceptionHandler = handler;
+    return this;
+  }
+
+  private void handleException(Throwable t) {
+    if (exceptionHandler != null) {
+      exceptionHandler.handle(t);
+    }
+  }
+
+  @Override
+  public synchronized WriteStream<Buffer> write(Buffer data) {
+    currentBufferCount++;
+    buffers.add(data);
+    context.executeBlocking(this::writeInWorker,
+        true,
+        ar -> {
+          if (ar.failed()) {
+            handleException(ar.cause());
+          }
+        });
+    return this;
+  }
+
+  protected void writeInWorker(Future<Object> future) {
+    while (true) {
+      Buffer buffer = buffers.poll();
+      if (buffer == null) {
+        future.complete();
+        return;
+      }
+
+      try {
+        outputStream.write(buffer.getBytes());
+
+        synchronized (OutputStreamToWriteStream.this) {
+          currentBufferCount--;
+          Runnable action = (currentBufferCount == 0 && closedDeferred != null) ? closedDeferred : this::checkDrained;
+          action.run();
+        }
+      } catch (IOException e) {
+        currentBufferCount--;
+        future.fail(e);
+        return;
+      }
+    }
+  }
+
+  @Override
+  public void end() {
+    close();
+  }
+
+  @Override
+  public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
+    this.maxBuffers = maxSize < SMALLEST_MAX_BUFFERS ? SMALLEST_MAX_BUFFERS : maxSize;
+    this.drainMark = maxBuffers / 2;
+    return this;
+  }
+
+  @Override
+  public synchronized boolean writeQueueFull() {
+    return currentBufferCount >= maxBuffers;
+  }
+
+  @Override
+  public synchronized WriteStream<Buffer> drainHandler(Handler<Void> handler) {
+    this.drainHandler = handler;
+    return this;
+  }
+
+  private synchronized void checkDrained() {
+    if (drainHandler != null && currentBufferCount <= drainMark) {
+      Handler<Void> handler = drainHandler;
+      drainHandler = null;
+      handler.handle(null);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> close() {
+    return closeInternal();
+  }
+
+  private void check() {
+    checkClosed();
+  }
+
+  private void checkClosed() {
+    if (closed) {
+      throw new IllegalStateException(this.getClass().getName() + " is closed");
+    }
+  }
+
+  private synchronized CompletableFuture<Void> closeInternal() {
+    check();
+
+    closed = true;
+
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    if (currentBufferCount == 0) {
+      doClose(future);
+    } else {
+      closedDeferred = () -> doClose(future);
+    }
+    return future;
+  }
+
+  private void doClose(CompletableFuture<Void> future) {
+    if (autoCloseOutputStream) {
+      try {
+        outputStream.close();
+      } catch (IOException e) {
+        future.completeExceptionally(new IllegalStateException("failed to close outputStream.", e));
+        return;
+      }
+    }
+
+    future.complete(null);
+  }
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
new file mode 100644
index 0000000..156510a
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.servicecomb.foundation.common.io.AsyncCloseable;
+
+import io.vertx.core.Context;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.Pump;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+
+public class PumpCommon {
+  /**
+   *
+   * @param context
+   * @param readStream
+   * @param writeStream
+   * @return future of save action<br>
+   * <p>important:
+   * <p>  if writeStream is AsyncCloseable, future means write complete
+   * <p>  if writeStream is not AsyncCloseable, future only means read complete
+   */
+  @SuppressWarnings("unchecked")
+  public CompletableFuture<Void> pump(Context context, ReadStream<Buffer> readStream, WriteStream<Buffer> writeStream) {
+    CompletableFuture<Void> readFuture = new CompletableFuture<>();
+
+    writeStream.exceptionHandler(readFuture::completeExceptionally);
+    readStream.exceptionHandler(readFuture::completeExceptionally);
+    // just means read finished, not means write finished
+    readStream.endHandler(readFuture::complete);
+
+    // if readStream(HttpClientResponse) and awriteStream(HttpServerResponse)
+    // belongs to difference eventloop
+    // maybe will cause deadlock
+    // if happened, vertx will print deadlock stacks
+    Pump.pump(readStream, writeStream).start();
+    try {
+      context.runOnContext(v -> readStream.resume());
+    } catch (Throwable e) {
+      readFuture.completeExceptionally(e);
+    }
+
+    if (!AsyncCloseable.class.isInstance(writeStream)) {
+      return readFuture;
+    }
+
+    return closeWriteStream((AsyncCloseable<Void>) writeStream, readFuture);
+  }
+
+  protected CompletableFuture<Void> closeWriteStream(AsyncCloseable<Void> writeStream,
+      CompletableFuture<Void> readFuture) {
+    CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+    readFuture.whenComplete((v, e) ->
+        writeStream.close().whenComplete((wv, we) -> {
+          if (we != null) {
+            writeFuture.completeExceptionally(we);
+            return;
+          }
+
+          writeFuture.complete(null);
+        })
+    );
+
+    return CompletableFuture.allOf(readFuture, writeFuture);
+  }
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java
new file mode 100644
index 0000000..348321c
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+
+import javax.servlet.http.Part;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.servicecomb.foundation.vertx.http.DownloadUtils;
+import org.apache.servicecomb.foundation.vertx.http.ReadStreamPart;
+
+import io.vertx.core.Context;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+
+public class PumpFromPart {
+  private Context context;
+
+  private Part part;
+
+  public PumpFromPart(Context context, Part part) {
+    this.context = context;
+    this.part = part;
+  }
+
+  private CompletableFuture<ReadStream<Buffer>> prepareReadStream() {
+    CompletableFuture<ReadStream<Buffer>> future = new CompletableFuture<>();
+
+    if (ReadStreamPart.class.isInstance(part)) {
+      future.complete(((ReadStreamPart) part).getReadStream());
+      return future;
+    }
+
+    try {
+      InputStream inputStream = part.getInputStream();
+      InputStreamToReadStream inputStreamToReadStream = new InputStreamToReadStream(context, inputStream, true);
+      inputStreamToReadStream.pause();
+      future.complete(inputStreamToReadStream);
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+
+    return future;
+  }
+
+  public CompletableFuture<Void> toWriteStream(WriteStream<Buffer> writeStream) {
+    return prepareReadStream()
+        .thenCompose(readStream -> new PumpCommon().pump(context, readStream, writeStream))
+        .whenComplete((v, e) -> DownloadUtils.clearPartResource(part));
+  }
+
+  public CompletableFuture<Void> toOutputStream(OutputStream outputStream, boolean autoCloseOutputStream) {
+    if (context == null) {
+      return toOutputStreamSync(outputStream, autoCloseOutputStream);
+    }
+
+    return toOutputStreamAsync(outputStream, autoCloseOutputStream);
+  }
+
+  private CompletableFuture<Void> toOutputStreamAsync(OutputStream outputStream, boolean autoCloseOutputStream) {
+    OutputStreamToWriteStream outputStreamToWriteStream = new OutputStreamToWriteStream(context, outputStream,
+        autoCloseOutputStream);
+    return toWriteStream(outputStreamToWriteStream);
+  }
+
+  // DO NOT use a mocked sync context to unify the pump logic
+  // otherwise when pump big stream, will cause stack overflow
+  private CompletableFuture<Void> toOutputStreamSync(OutputStream outputStream, boolean autoCloseOutputStream) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+
+    try (InputStream inputStream = part.getInputStream()) {
+      IOUtils.copyLarge(inputStream, outputStream);
+    } catch (Throwable e) {
+      future.completeExceptionally(e);
+    }
+
+    if (autoCloseOutputStream) {
+      try {
+        outputStream.close();
+      } catch (Throwable e) {
+        future.completeExceptionally(e);
+      }
+    }
+
+    future.complete(null);
+    return future;
+  }
+}
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
new file mode 100644
index 0000000..96899e8
--- /dev/null
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.ExecutionException;
+
+import javax.servlet.http.Part;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.servicecomb.foundation.common.part.InputStreamPart;
+import org.apache.servicecomb.foundation.vertx.stream.InputStreamToReadStream.ReadResult;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.vertx.core.Context;
+import io.vertx.core.Future;
+import io.vertx.core.impl.SyncContext;
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+
+public class TestPumpFromPart {
+  String src = RandomStringUtils.random(100);
+
+  boolean inputStreamClosed;
+
+  InputStream inputStream = new ByteArrayInputStream(src.getBytes()) {
+    @Override
+    public void close() throws IOException {
+      super.close();
+      inputStreamClosed = true;
+    }
+  };
+
+  Part part;
+
+  boolean outputStreamClosed;
+
+  BufferOutputStream outputStream;
+
+  IOException error = new IOException();
+
+  Context context = new SyncContext();
+
+  private void run(Context context, boolean closeOutput) throws Throwable {
+    inputStream.reset();
+    part = new InputStreamPart("name", inputStream);
+
+    outputStream = new BufferOutputStream() {
+      @Override
+      public void close() {
+        super.close();
+        outputStreamClosed = true;
+      }
+    };
+
+    new PumpFromPart(context, part).toOutputStream(outputStream, closeOutput).get();
+  }
+
+  public void do_pump_succ(Context context) throws Throwable {
+    run(context, true);
+
+    Assert.assertEquals(src, outputStream.getBuffer().toString());
+    Assert.assertTrue(inputStreamClosed);
+    Assert.assertTrue(outputStreamClosed);
+  }
+
+  @Test
+  public void pump_succ() throws Throwable {
+    do_pump_succ(null);
+    do_pump_succ(context);
+  }
+
+  public void do_pump_outputNotClose(Context context) throws Throwable {
+    run(context, false);
+
+    Assert.assertEquals(src, outputStream.getBuffer().toString());
+    Assert.assertFalse(outputStreamClosed);
+  }
+
+  @Test
+  public void pump_outputNotClose() throws Throwable {
+    do_pump_outputNotClose(null);
+    do_pump_outputNotClose(context);
+  }
+
+  public void pump_error(Context context) {
+    try {
+      run(context, true);
+      Assert.fail("must throw exception");
+    } catch (Throwable e) {
+      Assert.assertThat(e, Matchers.instanceOf(ExecutionException.class));
+      Assert.assertThat(e.getCause(), Matchers.sameInstance(error));
+    }
+
+    Assert.assertTrue(inputStreamClosed);
+    Assert.assertTrue(outputStreamClosed);
+  }
+
+  @Test
+  public void pump_read_error() throws IOException {
+    new MockUp<InputStreamToReadStream>() {
+      @Mock
+      void readInWorker(Future<ReadResult> future) {
+        future.fail(error);
+      }
+    };
+    new Expectations(IOUtils.class) {
+      {
+        IOUtils.copyLarge((InputStream) any, (OutputStream) any);
+        result = error;
+      }
+    };
+
+    pump_error(null);
+    pump_error(context);
+  }
+
+  @Test
+  public void pump_write_error() throws IOException {
+    new MockUp<BufferOutputStream>() {
+      @Mock
+      void write(byte[] b) throws IOException {
+        throw error;
+      }
+    };
+    new Expectations(IOUtils.class) {
+      {
+        IOUtils.copyLarge((InputStream) any, (OutputStream) any);
+        result = error;
+      }
+    };
+
+    pump_error(null);
+    pump_error(context);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
wujimin@apache.org.