You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/06/08 12:54:14 UTC

[GitHub] wujimin closed pull request #750: [SCB-484] Servlet rest support download

wujimin closed pull request #750: [SCB-484] Servlet rest support download
URL: https://github.com/apache/incubator-servicecomb-java-chassis/pull/750
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/param/FormProcessorCreator.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/param/FormProcessorCreator.java
index b60071876..78e595404 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/param/FormProcessorCreator.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/param/FormProcessorCreator.java
@@ -42,7 +42,7 @@ public FormProcessor(String paramPath, JavaType targetType) {
     }
 
     @Override
-    public Object getValue(HttpServletRequest request) throws Exception {
+    public Object getValue(HttpServletRequest request) {
       @SuppressWarnings("unchecked")
       Map<String, Object> forms = (Map<String, Object>) request.getAttribute(RestConst.FORM_PARAMETERS);
       if (forms != null) {
@@ -57,7 +57,7 @@ public Object getValue(HttpServletRequest request) throws Exception {
     }
 
     @Override
-    public void setValue(RestClientRequest clientRequest, Object arg) throws Exception {
+    public void setValue(RestClientRequest clientRequest, Object arg) {
       clientRequest.addForm(paramPath, arg);
     }
 
diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/param/RestClientRequestImpl.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/param/RestClientRequestImpl.java
index c6bb7e94f..505d84edd 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/param/RestClientRequestImpl.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/param/RestClientRequestImpl.java
@@ -20,7 +20,6 @@
 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
 import static javax.ws.rs.core.MediaType.MULTIPART_FORM_DATA;
 
-import java.io.IOException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
@@ -32,25 +31,23 @@
 import javax.servlet.http.Part;
 import javax.ws.rs.core.MediaType;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.servicecomb.common.rest.codec.RestClientRequest;
 import org.apache.servicecomb.common.rest.codec.RestObjectMapper;
 import org.apache.servicecomb.foundation.vertx.stream.BufferOutputStream;
-import org.apache.servicecomb.foundation.vertx.stream.InputStreamToReadStream;
+import org.apache.servicecomb.foundation.vertx.stream.PumpFromPart;
 import org.apache.servicecomb.swagger.invocation.AsyncResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.vertx.core.Vertx;
+import io.vertx.core.Context;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpClientRequest;
 import io.vertx.core.http.HttpHeaders;
-import io.vertx.core.streams.Pump;
 
 public class RestClientRequestImpl implements RestClientRequest {
   private static final Logger LOGGER = LoggerFactory.getLogger(RestClientRequestImpl.class);
 
-  protected Vertx vertx;
+  protected Context context;
 
   protected AsyncResponse asyncResp;
 
@@ -64,8 +61,8 @@
 
   protected Buffer bodyBuffer;
 
-  public RestClientRequestImpl(HttpClientRequest request, Vertx vertx, AsyncResponse asyncResp) {
-    this.vertx = vertx;
+  public RestClientRequestImpl(HttpClientRequest request, Context context, AsyncResponse asyncResp) {
+    this.context = context;
     this.asyncResp = asyncResp;
     this.request = request;
   }
@@ -175,30 +172,19 @@ private void attachFile(String boundary, Iterator<Entry<String, Part>> uploadsIt
     Part part = entry.getValue();
     String filename = part.getSubmittedFileName();
 
-    InputStreamToReadStream fileStream = null;
-    try {
-      fileStream = new InputStreamToReadStream(vertx, part.getInputStream());
-    } catch (IOException e) {
-      asyncResp.consumerFail(e);
-      return;
-    }
+    Buffer fileHeader = fileBoundaryInfo(boundary, name, part);
+    request.write(fileHeader);
 
-    InputStreamToReadStream finalFileStream = fileStream;
-    fileStream.exceptionHandler(e -> {
-      LOGGER.debug("Failed to sending file [{}:{}].", name, filename, e);
-      IOUtils.closeQuietly(finalFileStream.getInputStream());
-      asyncResp.consumerFail(e);
-    });
-    fileStream.endHandler(V -> {
-      LOGGER.debug("finish sending file [{}:{}].", name, filename);
-      IOUtils.closeQuietly(finalFileStream.getInputStream());
+    new PumpFromPart(context, part).toWriteStream(request).whenComplete((v, e) -> {
+      if (e != null) {
+        LOGGER.debug("Failed to sending file [{}:{}].", name, filename, e);
+        asyncResp.consumerFail(e);
+        return;
+      }
 
+      LOGGER.debug("finish sending file [{}:{}].", name, filename);
       attachFile(boundary, uploadsIterator);
     });
-
-    Buffer fileHeader = fileBoundaryInfo(boundary, name, part);
-    request.write(fileHeader);
-    Pump.pump(fileStream, request).start();
   }
 
   private Buffer boundaryEndInfo(String boundary) {
diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
index c806fc0ce..1f656effe 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
@@ -69,7 +69,7 @@ public Response afterReceiveRequest(Invocation invocation, HttpServletRequestEx
 
     responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
 
-    CompletableFuture<Void> future = new CompletableFuture<Void>();
+    CompletableFuture<Void> future = new CompletableFuture<>();
     try (BufferOutputStream output = new BufferOutputStream(Unpooled.compositeBuffer())) {
       produceProcessor.encodeResponse(output, body);
 
diff --git a/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/TestMgr.java b/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/TestMgr.java
index b0086f6f5..80c5bf0f5 100644
--- a/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/TestMgr.java
+++ b/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/TestMgr.java
@@ -66,6 +66,14 @@ public static void checkNotEmpty(String real) {
     }
   }
 
+  public static void failed(String desc, Throwable e) {
+    Error error = new Error(msg + " | " + desc + ", method is " + getCaller());
+    if (e != null) {
+      error.setStackTrace(error.getStackTrace());
+    }
+    errorList.add(error);
+  }
+
   public static void summary() {
     if (errorList.isEmpty()) {
       LOGGER.info("............. test finished ............");
diff --git a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/TestDownload.java b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/TestDownload.java
index c782b6814..6b7313545 100644
--- a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/TestDownload.java
+++ b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/TestDownload.java
@@ -69,8 +69,9 @@ private String readFileToString(File file) {
     future.whenComplete((result, e) -> {
       Object value = result;
       if (File.class.isInstance(value)) {
-        value = readFileToString((File) value);
-        ((File) value).delete();
+        File file = (File) value;
+        value = readFileToString(file);
+        file.delete();
       } else if (byte[].class.isInstance(value)) {
         value = new String((byte[]) value);
       }
@@ -128,6 +129,7 @@ public void runRest() {
           .allOf(futures.toArray(new CompletableFuture[futures.size()]))
           .get();
     } catch (InterruptedException | ExecutionException e1) {
+      TestMgr.failed("test download failed.", e1);
     }
   }
 }
diff --git a/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/io/AsyncCloseable.java b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/io/AsyncCloseable.java
new file mode 100644
index 000000000..a33756586
--- /dev/null
+++ b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/io/AsyncCloseable.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.common.io;
+
+import java.util.concurrent.CompletableFuture;
+
+public interface AsyncCloseable<T> {
+  CompletableFuture<T> close();
+}
diff --git a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java
new file mode 100644
index 000000000..7eecbfd81
--- /dev/null
+++ b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.vertx.core.impl;
+
+import java.util.concurrent.Executor;
+
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.spi.metrics.PoolMetrics;
+
+public class SyncContext extends EventLoopContext {
+  public SyncContext() {
+    this(null);
+  }
+
+  public SyncContext(VertxInternal vertx) {
+    super(vertx, null, null, null, null, null, null);
+    if (SyncVertx.class.isInstance(vertx)) {
+      ((SyncVertx) vertx).setContext(this);
+    }
+  }
+
+  @Override
+  public void runOnContext(Handler<Void> task) {
+    task.handle(null);
+  }
+
+  public static <T> void syncExecuteBlocking(Handler<Future<T>> blockingCodeHandler,
+      Handler<AsyncResult<T>> asyncResultHandler) {
+    Future<T> res = Future.future();
+
+    try {
+      blockingCodeHandler.handle(res);
+    } catch (Throwable e) {
+      res.fail(e);
+      return;
+    }
+
+    res.setHandler(asyncResultHandler);
+  }
+
+  @Override
+  public <T> void executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler) {
+    syncExecuteBlocking((future) -> {
+      try {
+        future.complete(action.perform());
+      } catch (Throwable e) {
+        future.fail(e);
+      }
+    }, resultHandler);
+  }
+
+  @Override
+  public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
+      Handler<AsyncResult<T>> asyncResultHandler) {
+    syncExecuteBlocking(blockingCodeHandler, asyncResultHandler);
+  }
+
+  @Override
+  <T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,
+      Handler<AsyncResult<T>> resultHandler,
+      Executor exec, TaskQueue queue, PoolMetrics metrics) {
+    syncExecuteBlocking(blockingCodeHandler, resultHandler);
+  }
+}
diff --git a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java
new file mode 100644
index 000000000..5fada598c
--- /dev/null
+++ b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.vertx.core.impl;
+
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxOptions;
+
+/**
+ * after test finished, need to invoke vertx.close
+ */
+public class SyncVertx extends VertxImpl {
+  private ContextImpl context = new SyncContext(this);
+
+  public SyncVertx() {
+    this(null, null);
+  }
+
+  protected SyncVertx(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
+  }
+
+  @Override
+  public ContextImpl getContext() {
+    return context;
+  }
+
+  public void setContext(ContextImpl context) {
+    this.context = context;
+  }
+
+  @Override
+  public ContextImpl getOrCreateContext() {
+    return context;
+  }
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/DownloadUtils.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/DownloadUtils.java
new file mode 100644
index 000000000..8f5bffc25
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/DownloadUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.http;
+
+import java.io.IOException;
+
+import javax.servlet.http.Part;
+
+import org.apache.servicecomb.foundation.common.http.HttpUtils;
+import org.apache.servicecomb.foundation.common.part.FilePartForSend;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.http.HttpHeaders;
+
+/**
+ * internal api
+ */
+public final class DownloadUtils {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DownloadUtils.class);
+
+  private DownloadUtils() {
+  }
+
+  public static void prepareDownloadHeader(HttpServletResponseEx responseEx, Part part) {
+    if (responseEx.getHeader(HttpHeaders.CONTENT_LENGTH.toString()) == null) {
+      responseEx.setChunked(true);
+    }
+
+    if (responseEx.getHeader(HttpHeaders.CONTENT_TYPE.toString()) == null) {
+      responseEx.setHeader(HttpHeaders.CONTENT_TYPE.toString(), part.getContentType());
+    }
+
+    if (responseEx.getHeader(javax.ws.rs.core.HttpHeaders.CONTENT_DISPOSITION) == null) {
+      // to support chinese and space filename in firefox
+      // must use "filename*", (https://tools.ietf.org/html/rtf6266)
+      String encodedFileName = HttpUtils.uriEncodePath(part.getSubmittedFileName());
+      responseEx.setHeader(javax.ws.rs.core.HttpHeaders.CONTENT_DISPOSITION,
+          "attachment;filename=" + encodedFileName + ";filename*=utf-8''" + encodedFileName);
+    }
+  }
+
+  public static void clearPartResource(Part part) {
+    if (FilePartForSend.class.isInstance(part) && ((FilePartForSend) part).isDeleteAfterFinished()) {
+      try {
+        part.delete();
+      } catch (IOException e) {
+        LOGGER.error("Failed to delete temp file: {}.", ((FilePartForSend) part).getAbsolutePath(), e);
+      }
+    }
+  }
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
index a1ee34ec9..e1c2158d7 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java
@@ -23,6 +23,8 @@
 import javax.servlet.http.Part;
 import javax.ws.rs.core.Response.StatusType;
 
+import io.vertx.core.http.HttpHeaders;
+
 public interface HttpServletResponseEx extends HttpServletResponse, BodyBufferSupport {
   StatusType getStatusType();
 
@@ -31,4 +33,8 @@
   Object getAttribute(String key);
 
   CompletableFuture<Void> sendPart(Part body);
+
+  default void setChunked(boolean chunked) {
+    setHeader(HttpHeaders.TRANSFER_ENCODING.toString(), HttpHeaders.CHUNKED.toString());
+  }
 }
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
index ddc56c9e3..bb922b2a6 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
@@ -26,6 +26,7 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.servicecomb.foundation.common.http.HttpUtils;
 import org.apache.servicecomb.foundation.common.part.AbstractPart;
+import org.apache.servicecomb.foundation.vertx.stream.PumpCommon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,6 @@
 import io.vertx.core.file.AsyncFile;
 import io.vertx.core.file.OpenOptions;
 import io.vertx.core.http.HttpClientResponse;
-import io.vertx.core.streams.Pump;
 import io.vertx.core.streams.ReadStream;
 import io.vertx.core.streams.WriteStream;
 
@@ -74,33 +74,32 @@ public ReadStreamPart(Context context, ReadStream<Buffer> readStream) {
     readStream.pause();
   }
 
-  public CompletableFuture<Void> saveToWriteStream(WriteStream<Buffer> writeStream) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-
-    writeStream.exceptionHandler(future::completeExceptionally);
-    readStream.exceptionHandler(future::completeExceptionally);
-    readStream.endHandler(future::complete);
+  public Context getContext() {
+    return context;
+  }
 
-    // if readStream(HttpClientResponse) and writeStream(HttpServerResponse)
-    // belongs to difference eventloop
-    // maybe will cause deadlock
-    // if happened, vertx will print deadlock stacks
-    Pump.pump(readStream, writeStream).start();
-    readStream.resume();
+  public ReadStream<Buffer> getReadStream() {
+    return readStream;
+  }
 
-    return future;
+  /**
+   *
+   * @param writeStream
+   * @return future of save action<br>
+   *
+   * important: WriteStream did not provide endHandler, so we can not know when will really finished write.
+   * so the return future only means finished read from readStream.
+   */
+  public CompletableFuture<Void> saveToWriteStream(WriteStream<Buffer> writeStream) {
+    return new PumpCommon().pump(context, readStream, writeStream);
   }
 
   public CompletableFuture<byte[]> saveAsBytes() {
-    return saveAs(buf -> {
-      return buf.getBytes();
-    });
+    return saveAs(buf -> buf.getBytes());
   }
 
   public CompletableFuture<String> saveAsString() {
-    return saveAs(buf -> {
-      return buf.toString();
-    });
+    return saveAs(buf -> buf.toString());
   }
 
   public <T> CompletableFuture<T> saveAs(Function<Buffer, T> converter) {
@@ -109,14 +108,17 @@ public ReadStreamPart(Context context, ReadStream<Buffer> readStream) {
 
     readStream.exceptionHandler(future::completeExceptionally);
     readStream.handler(buffer::appendBuffer);
-    readStream.endHandler(v -> {
-      future.complete(converter.apply(buffer));
-    });
+    readStream.endHandler(v -> future.complete(converter.apply(buffer)));
     readStream.resume();
 
     return future;
   }
 
+  /**
+   *
+   * @param fileName
+   * @return future of save to file, future complete means write to file finished
+   */
   public CompletableFuture<File> saveToFile(String fileName) {
     File file = new File(fileName);
     file.getParentFile().mkdirs();
@@ -124,12 +126,20 @@ public ReadStreamPart(Context context, ReadStream<Buffer> readStream) {
     return saveToFile(file, openOptions);
   }
 
+  /**
+   *
+   * @param file
+   * @param openOptions
+   * @return future of save to file, future complete means write to file finished
+   */
   public CompletableFuture<File> saveToFile(File file, OpenOptions openOptions) {
     CompletableFuture<File> future = new CompletableFuture<>();
 
-    Vertx vertx = context.owner();
-    vertx.fileSystem().open(file.getAbsolutePath(), openOptions, ar -> {
-      onFileOpened(file, ar, future);
+    context.runOnContext((v) -> {
+      Vertx vertx = context.owner();
+      vertx.fileSystem().open(file.getAbsolutePath(), openOptions, ar -> {
+        onFileOpened(file, ar, future);
+      });
     });
 
     return future;
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java
index 13c0ba710..2050601bb 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.foundation.vertx.http;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -28,7 +29,10 @@
 import javax.ws.rs.core.Response.StatusType;
 
 import org.apache.servicecomb.foundation.common.http.HttpStatus;
+import org.apache.servicecomb.foundation.vertx.stream.PumpFromPart;
 
+import io.vertx.core.Context;
+import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 
 public class StandardHttpServletResponseEx extends HttpServletResponseWrapper implements HttpServletResponseEx {
@@ -99,7 +103,20 @@ public Object getAttribute(String key) {
   }
 
   @Override
-  public CompletableFuture<Void> sendPart(Part body) {
-    throw new Error("not supported method");
+  public CompletableFuture<Void> sendPart(Part part) {
+    DownloadUtils.prepareDownloadHeader(this, part);
+
+    OutputStream outputStream;
+    try {
+      outputStream = getOutputStream();
+    } catch (IOException e) {
+      CompletableFuture<Void> future = new CompletableFuture<>();
+      future.completeExceptionally(e);
+      return future;
+    }
+
+    // if context is null, then will switch to sync logic
+    Context context = Vertx.currentContext();
+    return new PumpFromPart(context, part).toOutputStream(outputStream, false);
   }
 }
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
index 75bf693dc..0d12c6522 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
@@ -17,8 +17,6 @@
 
 package org.apache.servicecomb.foundation.vertx.http;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.util.Collection;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -27,22 +25,14 @@
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response.StatusType;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.servicecomb.foundation.common.http.HttpStatus;
-import org.apache.servicecomb.foundation.common.http.HttpUtils;
-import org.apache.servicecomb.foundation.common.part.FilePartForSend;
-import org.apache.servicecomb.foundation.vertx.stream.InputStreamToReadStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.servicecomb.foundation.vertx.stream.PumpFromPart;
 
 import io.vertx.core.Context;
 import io.vertx.core.Vertx;
 import io.vertx.core.http.HttpServerResponse;
-import io.vertx.core.streams.Pump;
 
 public class VertxServerResponseToHttpServletResponse extends AbstractHttpServletResponse {
-  private static final Logger LOGGER = LoggerFactory.getLogger(VertxServerResponseToHttpServletResponse.class);
-
   private Context context;
 
   private HttpServerResponse serverResponse;
@@ -112,7 +102,7 @@ public String getHeader(String name) {
   }
 
   @Override
-  public void flushBuffer() throws IOException {
+  public void flushBuffer() {
     if (context == Vertx.currentContext()) {
       internalFlushBuffer();
       return;
@@ -134,60 +124,13 @@ public void internalFlushBuffer() {
 
   @Override
   public CompletableFuture<Void> sendPart(Part part) {
-    prepareSendPartHeader(part);
-
-    if (ReadStreamPart.class.isInstance(part)) {
-      return ((ReadStreamPart) part).saveToWriteStream(this.serverResponse);
-    }
-
-    CompletableFuture<Void> future = new CompletableFuture<Void>();
-    try {
-      InputStream is = part.getInputStream();
-      context.runOnContext(v -> {
-        InputStreamToReadStream inputStreamToReadStream = new InputStreamToReadStream(context.owner(), is);
-        inputStreamToReadStream.exceptionHandler(t -> {
-          clearPartResource(part, is);
-          future.completeExceptionally(t);
-        });
-        inputStreamToReadStream.endHandler(V -> {
-          clearPartResource(part, is);
-          future.complete(null);
-        });
-        Pump.pump(inputStreamToReadStream, serverResponse).start();
-      });
-    } catch (IOException e) {
-      future.completeExceptionally(e);
-    }
-
-    return future;
-  }
+    DownloadUtils.prepareDownloadHeader(this, part);
 
-  protected void prepareSendPartHeader(Part part) {
-    if (!serverResponse.headers().contains(HttpHeaders.CONTENT_LENGTH)) {
-      serverResponse.setChunked(true);
-    }
-
-    if (!serverResponse.headers().contains(HttpHeaders.CONTENT_TYPE)) {
-      serverResponse.putHeader(HttpHeaders.CONTENT_TYPE, part.getContentType());
-    }
-
-    if (!serverResponse.headers().contains(HttpHeaders.CONTENT_DISPOSITION)) {
-      // to support chinese and space filename in firefox
-      // must use "filename*", (https://tools.ietf.org/html/rtf6266)
-      String encodedFileName = HttpUtils.uriEncodePath(part.getSubmittedFileName());
-      serverResponse.putHeader(HttpHeaders.CONTENT_DISPOSITION,
-          "attachment;filename=" + encodedFileName + ";filename*=utf-8''" + encodedFileName);
-    }
+    return new PumpFromPart(context, part).toWriteStream(serverResponse);
   }
 
-  protected void clearPartResource(Part part, InputStream is) {
-    IOUtils.closeQuietly(is);
-    if (FilePartForSend.class.isInstance(part) && ((FilePartForSend) part).isDeleteAfterFinished()) {
-      try {
-        part.delete();
-      } catch (IOException e) {
-        LOGGER.error("Failed to delete temp file: {}.", ((FilePartForSend) part).getAbsolutePath(), e);
-      }
-    }
+  @Override
+  public void setChunked(boolean chunked) {
+    serverResponse.setChunked(chunked);
   }
 }
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
index 2066dfe9c..01d65869e 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
@@ -25,9 +25,9 @@
 
 import io.netty.buffer.Unpooled;
 import io.vertx.core.AsyncResult;
+import io.vertx.core.Context;
 import io.vertx.core.Future;
 import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.streams.ReadStream;
 
@@ -36,7 +36,7 @@
 
   public static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;
 
-  private Vertx vertx;
+  private Context context;
 
   private InputStream inputStream;
 
@@ -54,14 +54,15 @@
 
   private Handler<Void> endHandler;
 
-  public InputStreamToReadStream(Vertx vertx, InputStream inputStream) {
-    this.vertx = vertx;
+  private boolean autoCloseInputStream;
+
+  public InputStreamToReadStream(Context context, InputStream inputStream,
+      boolean autoCloseInputStream) {
+    this.context = context;
     this.inputStream = inputStream;
+    this.autoCloseInputStream = autoCloseInputStream;
   }
 
-  public InputStream getInputStream() {
-    return inputStream;
-  }
 
   public synchronized InputStreamToReadStream readBufferSize(int readBufferSize) {
     this.readBufferSize = readBufferSize;
@@ -113,8 +114,8 @@ private synchronized void doRead() {
     if (!readInProgress) {
       readInProgress = true;
 
-      vertx.executeBlocking(this::readInWorker,
-          false,
+      context.executeBlocking(this::readInWorker,
+          true,
           this::afterReadInEventloop);
     }
   }
@@ -129,9 +130,14 @@ private synchronized void readInWorker(Future<ReadResult> future) {
     }
   }
 
+  private void handleException(Throwable e) {
+    closeInputStream();
+    exceptionHandler.handle(e);
+  }
+
   private synchronized void afterReadInEventloop(AsyncResult<ReadResult> ar) {
     if (ar.failed()) {
-      exceptionHandler.handle(ar.cause());
+      handleException(ar.cause());
       return;
     }
 
@@ -175,11 +181,25 @@ private synchronized void handleData(Buffer buffer) {
 
   private synchronized void handleEnd() {
     dataHandler = null;
+    closeInputStream();
     if (endHandler != null) {
       endHandler.handle(null);
     }
   }
 
+  private void closeInputStream() {
+    closed = true;
+    if (!autoCloseInputStream) {
+      return;
+    }
+
+    try {
+      inputStream.close();
+    } catch (IOException e) {
+      LOGGER.error("failed to close inputSteam.", e);
+    }
+  }
+
   @Override
   public ReadStream<Buffer> endHandler(Handler<Void> handler) {
     check();
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java
new file mode 100644
index 000000000..9e697598b
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.servicecomb.foundation.common.io.AsyncCloseable;
+
+import io.vertx.core.Context;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.WriteStream;
+
+/**
+ * for pump from a readStream
+ */
+public class OutputStreamToWriteStream implements WriteStream<Buffer>, AsyncCloseable<Void> {
+  private static final int DEFAULT_MAX_BUFFERS = 4;
+
+  private static final int SMALLEST_MAX_BUFFERS = 2;
+
+  private OutputStream outputStream;
+
+  private Context context;
+
+  private boolean autoCloseOutputStream;
+
+  private Handler<Throwable> exceptionHandler;
+
+  // resume readStream
+  private Handler<Void> drainHandler;
+
+  // when invoke close, but outputStream not write all data, must put close logic to closedDeferred
+  private Runnable closedDeferred;
+
+  private boolean closed;
+
+  // buffers.size() need to loop all node, and maybe result is not correct in concurrent condition
+  // we just need to flow control by pump, so use another size
+  private Queue<Buffer> buffers = new ConcurrentLinkedQueue<>();
+
+  private int currentBufferCount;
+
+  // just indicate if buffers is full, not control add logic
+  // must >= SMALLEST_MAX_BUFFERS
+  // if < SMALLEST_MAX_BUFFERS, then maxBuffers will be SMALLEST_MAX_BUFFERS
+  private int maxBuffers = DEFAULT_MAX_BUFFERS;
+
+  // if currentBufferCount <= drainMark, will invoke drainHandler to resume readStream
+  private int drainMark = maxBuffers / 2;
+
+  public OutputStreamToWriteStream(Context context, OutputStream outputStream,
+      boolean autoCloseOutputStream) {
+    this.context = context;
+    this.outputStream = outputStream;
+    this.autoCloseOutputStream = autoCloseOutputStream;
+  }
+
+  @Override
+  public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
+    this.exceptionHandler = handler;
+    return this;
+  }
+
+  private void handleException(Throwable t) {
+    if (exceptionHandler != null) {
+      exceptionHandler.handle(t);
+    }
+  }
+
+  @Override
+  public synchronized WriteStream<Buffer> write(Buffer data) {
+    currentBufferCount++;
+    buffers.add(data);
+    context.executeBlocking(this::writeInWorker,
+        true,
+        ar -> {
+          if (ar.failed()) {
+            handleException(ar.cause());
+          }
+        });
+    return this;
+  }
+
+  protected void writeInWorker(Future<Object> future) {
+    while (true) {
+      Buffer buffer = buffers.poll();
+      if (buffer == null) {
+        future.complete();
+        return;
+      }
+
+      try {
+        outputStream.write(buffer.getBytes());
+
+        synchronized (OutputStreamToWriteStream.this) {
+          currentBufferCount--;
+          Runnable action = (currentBufferCount == 0 && closedDeferred != null) ? closedDeferred : this::checkDrained;
+          action.run();
+        }
+      } catch (IOException e) {
+        currentBufferCount--;
+        future.fail(e);
+        return;
+      }
+    }
+  }
+
+  @Override
+  public void end() {
+    close();
+  }
+
+  @Override
+  public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
+    this.maxBuffers = maxSize < SMALLEST_MAX_BUFFERS ? SMALLEST_MAX_BUFFERS : maxSize;
+    this.drainMark = maxBuffers / 2;
+    return this;
+  }
+
+  @Override
+  public synchronized boolean writeQueueFull() {
+    return currentBufferCount >= maxBuffers;
+  }
+
+  @Override
+  public synchronized WriteStream<Buffer> drainHandler(Handler<Void> handler) {
+    this.drainHandler = handler;
+    return this;
+  }
+
+  private synchronized void checkDrained() {
+    if (drainHandler != null && currentBufferCount <= drainMark) {
+      Handler<Void> handler = drainHandler;
+      drainHandler = null;
+      handler.handle(null);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> close() {
+    return closeInternal();
+  }
+
+  private void check() {
+    checkClosed();
+  }
+
+  private void checkClosed() {
+    if (closed) {
+      throw new IllegalStateException(this.getClass().getName() + " is closed");
+    }
+  }
+
+  private synchronized CompletableFuture<Void> closeInternal() {
+    check();
+
+    closed = true;
+
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    if (currentBufferCount == 0) {
+      doClose(future);
+    } else {
+      closedDeferred = () -> doClose(future);
+    }
+    return future;
+  }
+
+  private void doClose(CompletableFuture<Void> future) {
+    if (autoCloseOutputStream) {
+      try {
+        outputStream.close();
+      } catch (IOException e) {
+        future.completeExceptionally(new IllegalStateException("failed to close outputStream.", e));
+        return;
+      }
+    }
+
+    future.complete(null);
+  }
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
new file mode 100644
index 000000000..156510a94
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.servicecomb.foundation.common.io.AsyncCloseable;
+
+import io.vertx.core.Context;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.Pump;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+
+public class PumpCommon {
+  /**
+   *
+   * @param context
+   * @param readStream
+   * @param writeStream
+   * @return future of save action<br>
+   * <p>important:
+   * <p>  if writeStream is AsyncCloseable, future means write complete
+   * <p>  if writeStream is not AsyncCloseable, future only means read complete
+   */
+  @SuppressWarnings("unchecked")
+  public CompletableFuture<Void> pump(Context context, ReadStream<Buffer> readStream, WriteStream<Buffer> writeStream) {
+    CompletableFuture<Void> readFuture = new CompletableFuture<>();
+
+    writeStream.exceptionHandler(readFuture::completeExceptionally);
+    readStream.exceptionHandler(readFuture::completeExceptionally);
+    // just means read finished, not means write finished
+    readStream.endHandler(readFuture::complete);
+
+    // if readStream(HttpClientResponse) and awriteStream(HttpServerResponse)
+    // belongs to difference eventloop
+    // maybe will cause deadlock
+    // if happened, vertx will print deadlock stacks
+    Pump.pump(readStream, writeStream).start();
+    try {
+      context.runOnContext(v -> readStream.resume());
+    } catch (Throwable e) {
+      readFuture.completeExceptionally(e);
+    }
+
+    if (!AsyncCloseable.class.isInstance(writeStream)) {
+      return readFuture;
+    }
+
+    return closeWriteStream((AsyncCloseable<Void>) writeStream, readFuture);
+  }
+
+  protected CompletableFuture<Void> closeWriteStream(AsyncCloseable<Void> writeStream,
+      CompletableFuture<Void> readFuture) {
+    CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+    readFuture.whenComplete((v, e) ->
+        writeStream.close().whenComplete((wv, we) -> {
+          if (we != null) {
+            writeFuture.completeExceptionally(we);
+            return;
+          }
+
+          writeFuture.complete(null);
+        })
+    );
+
+    return CompletableFuture.allOf(readFuture, writeFuture);
+  }
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java
new file mode 100644
index 000000000..348321c65
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+
+import javax.servlet.http.Part;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.servicecomb.foundation.vertx.http.DownloadUtils;
+import org.apache.servicecomb.foundation.vertx.http.ReadStreamPart;
+
+import io.vertx.core.Context;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+
+public class PumpFromPart {
+  private Context context;
+
+  private Part part;
+
+  public PumpFromPart(Context context, Part part) {
+    this.context = context;
+    this.part = part;
+  }
+
+  private CompletableFuture<ReadStream<Buffer>> prepareReadStream() {
+    CompletableFuture<ReadStream<Buffer>> future = new CompletableFuture<>();
+
+    if (ReadStreamPart.class.isInstance(part)) {
+      future.complete(((ReadStreamPart) part).getReadStream());
+      return future;
+    }
+
+    try {
+      InputStream inputStream = part.getInputStream();
+      InputStreamToReadStream inputStreamToReadStream = new InputStreamToReadStream(context, inputStream, true);
+      inputStreamToReadStream.pause();
+      future.complete(inputStreamToReadStream);
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+
+    return future;
+  }
+
+  public CompletableFuture<Void> toWriteStream(WriteStream<Buffer> writeStream) {
+    return prepareReadStream()
+        .thenCompose(readStream -> new PumpCommon().pump(context, readStream, writeStream))
+        .whenComplete((v, e) -> DownloadUtils.clearPartResource(part));
+  }
+
+  public CompletableFuture<Void> toOutputStream(OutputStream outputStream, boolean autoCloseOutputStream) {
+    if (context == null) {
+      return toOutputStreamSync(outputStream, autoCloseOutputStream);
+    }
+
+    return toOutputStreamAsync(outputStream, autoCloseOutputStream);
+  }
+
+  private CompletableFuture<Void> toOutputStreamAsync(OutputStream outputStream, boolean autoCloseOutputStream) {
+    OutputStreamToWriteStream outputStreamToWriteStream = new OutputStreamToWriteStream(context, outputStream,
+        autoCloseOutputStream);
+    return toWriteStream(outputStreamToWriteStream);
+  }
+
+  // DO NOT use a mocked sync context to unify the pump logic
+  // otherwise when pump big stream, will cause stack overflow
+  private CompletableFuture<Void> toOutputStreamSync(OutputStream outputStream, boolean autoCloseOutputStream) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+
+    try (InputStream inputStream = part.getInputStream()) {
+      IOUtils.copyLarge(inputStream, outputStream);
+    } catch (Throwable e) {
+      future.completeExceptionally(e);
+    }
+
+    if (autoCloseOutputStream) {
+      try {
+        outputStream.close();
+      } catch (Throwable e) {
+        future.completeExceptionally(e);
+      }
+    }
+
+    future.complete(null);
+    return future;
+  }
+}
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
index 0f9e730d9..0e50c68ba 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
@@ -28,30 +28,20 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.servicecomb.foundation.vertx.stream.InputStreamToReadStream;
 import org.hamcrest.Matchers;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Context;
-import io.vertx.core.Future;
 import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
-import io.vertx.core.file.AsyncFile;
-import io.vertx.core.file.FileSystem;
 import io.vertx.core.file.FileSystemException;
 import io.vertx.core.file.OpenOptions;
-import io.vertx.core.file.impl.AsyncFileUitls;
-import io.vertx.core.file.impl.FileSystemImpl;
-import io.vertx.core.file.impl.WindowsFileSystem;
 import io.vertx.core.http.HttpClientResponse;
 import io.vertx.core.impl.ContextImpl;
-import io.vertx.core.impl.EventLoopContext;
-import io.vertx.core.impl.Utils;
-import io.vertx.core.impl.VertxInternal;
+import io.vertx.core.impl.SyncVertx;
 import io.vertx.core.streams.WriteStream;
 import mockit.Expectations;
 import mockit.Mock;
@@ -59,93 +49,30 @@
 import mockit.Mocked;
 
 public class TestReadStreamPart {
-  @Mocked
-  VertxInternal vertx;
+  static SyncVertx vertx = new SyncVertx();
 
-  //  @Mocked
-  ContextImpl context;
+  static ContextImpl context = vertx.getContext();
 
-  String src = "src";
+  static String src = "src";
 
-  InputStreamToReadStream readStream;
+  static InputStream inputStream = new ByteArrayInputStream(src.getBytes());
 
-  ReadStreamPart part;
+  InputStreamToReadStream readStream = new InputStreamToReadStream(context, inputStream, true);
 
-  InputStream inputStream = new ByteArrayInputStream(src.getBytes());
+  ReadStreamPart part = new ReadStreamPart(context, readStream);
 
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
-  FileSystem fileSystem;
-
-  protected FileSystem getFileSystem() {
-    return Utils.isWindows() ? new WindowsFileSystem(vertx) : new FileSystemImpl(vertx);
-  }
 
   @Before
-  public void setup() {
-    new MockUp<Vertx>(vertx) {
-      @Mock
-      FileSystem fileSystem() {
-        return fileSystem;
-      }
-
-      @Mock
-      ContextImpl getContext() {
-        return context;
-      }
-
-      @Mock
-      ContextImpl getOrCreateContext() {
-        return context;
-      }
-
-      @Mock
-      <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
-          Handler<AsyncResult<T>> resultHandler) {
-        Future<T> future = Future.future();
-        blockingCodeHandler.handle(future);
-        future.setHandler(resultHandler);
-      }
-    };
-
-    context = new EventLoopContext(vertx, null, null, null, "id", null, null);
-    new MockUp<Context>(context) {
-      @Mock
-      Vertx owner() {
-        return vertx;
-      }
-
-      @Mock
-      void runOnContext(Handler<Void> task) {
-        task.handle(null);
-      }
-
-      @Mock
-      <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
-        Future<T> future = Future.future();
-        blockingCodeHandler.handle(future);
-        future.setHandler(resultHandler);
-      }
-    };
-
-    fileSystem = getFileSystem();
-
-    readStream = new InputStreamToReadStream(vertx, inputStream);
-    part = new ReadStreamPart(context, readStream);
+  public void setup() throws IOException {
+    inputStream.reset();
+  }
 
-    new MockUp<FileSystem>(fileSystem) {
-      @Mock
-      FileSystem open(String path, OpenOptions options, Handler<AsyncResult<AsyncFile>> handler) {
-        try {
-          AsyncFile asyncFile = AsyncFileUitls.createAsyncFile(vertx, path, options, context);
-          handler.handle(Future.succeededFuture(asyncFile));
-        } catch (Exception e) {
-          handler.handle(Future.failedFuture(e));
-        }
-        return fileSystem;
-      }
-    };
+  @AfterClass
+  public static void teardown() {
+    vertx.close();
   }
 
   @Test
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java
index 535bd8f68..ac6811925 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java
@@ -17,12 +17,17 @@
 
 package org.apache.servicecomb.foundation.vertx.http;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 
 import javax.servlet.ServletOutputStream;
 import javax.servlet.WriteListener;
 import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.Part;
 
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.servicecomb.foundation.common.part.InputStreamPart;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Before;
@@ -31,6 +36,7 @@
 import org.junit.rules.ExpectedException;
 
 import io.vertx.core.buffer.Buffer;
+import mockit.Expectations;
 import mockit.Mock;
 import mockit.MockUp;
 import mockit.Mocked;
@@ -132,9 +138,42 @@ public void attribute() {
   }
 
   @Test
-  public void sendPart() {
-    setExceptionExpected();
+  public void sendPart_succ() throws Throwable {
+    String src = RandomStringUtils.random(100);
+    InputStream inputStream = new ByteArrayInputStream(src.getBytes());
+    Part part = new InputStreamPart("name", inputStream);
+    Buffer buffer = Buffer.buffer();
+    ServletOutputStream outputStream = new MockUp<ServletOutputStream>() {
+      @Mock
+      void write(int b) {
+        buffer.appendByte((byte) b);
+      }
+    }.getMockInstance();
+
+    new Expectations() {
+      {
+        response.getOutputStream();
+        result = outputStream;
+      }
+    };
+
+    responseEx.sendPart(part).get();
+
+    Assert.assertEquals(src, buffer.toString());
+  }
+
+  @Test
+  public void sendPart_failed(@Mocked Part part) throws Throwable {
+    Error error = new Error();
+    new Expectations() {
+      {
+        response.getOutputStream();
+        result = error;
+      }
+    };
+
+    expectedException.expect(Matchers.sameInstance(error));
 
-    responseEx.sendPart(null);
+    responseEx.sendPart(part).get();
   }
 }
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
index 0626dab5c..8ae82dbf6 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
@@ -31,6 +31,7 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.servicecomb.foundation.common.http.HttpStatus;
 import org.apache.servicecomb.foundation.common.part.FilePart;
+import org.apache.servicecomb.foundation.vertx.stream.PumpFromPart;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Before;
@@ -46,6 +47,7 @@
 import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpServerResponse;
+import io.vertx.core.impl.SyncContext;
 import io.vertx.core.impl.VertxImpl;
 import io.vertx.core.streams.WriteStream;
 import mockit.Deencapsulation;
@@ -150,20 +152,15 @@ void runOnContext(Handler<Void> action) {
         action.handle(null);
       }
 
-      @Mock
-      Vertx owner() {
-        return vertx;
-      }
-    };
-
-    new MockUp<Vertx>(vertx) {
       @Mock
       <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
           Handler<AsyncResult<T>> resultHandler) {
-        Future<T> future = Future.future();
-        future.setHandler(resultHandler);
+        SyncContext.syncExecuteBlocking(blockingCodeHandler, resultHandler);
+      }
 
-        blockingCodeHandler.handle(future);
+      @Mock
+      Vertx owner() {
+        return vertx;
       }
     };
 
@@ -308,7 +305,7 @@ public void prepareSendPartHeader_update(@Mocked Part part) {
         result = "测     试";
       }
     };
-    response.prepareSendPartHeader(part);
+    DownloadUtils.prepareDownloadHeader(response, part);
 
     Assert.assertTrue(serverResponse.isChunked());
     Assert.assertEquals("type", response.getHeader(HttpHeaders.CONTENT_TYPE));
@@ -323,7 +320,7 @@ public void prepareSendPartHeader_notUpdate(@Mocked Part part) {
     headers.add(HttpHeaders.CONTENT_TYPE, "type");
     headers.add(HttpHeaders.CONTENT_DISPOSITION, "disposition");
 
-    response.prepareSendPartHeader(part);
+    DownloadUtils.prepareDownloadHeader(response, part);
 
     Assert.assertFalse(serverResponse.isChunked());
     Assert.assertEquals("type", response.getHeader(HttpHeaders.CONTENT_TYPE));
@@ -372,13 +369,12 @@ public void sendPart_inputStreamBreak(@Mocked Part part, @Mocked InputStream inp
 
   @SuppressWarnings("unchecked")
   @Test
-  public void sendPart_ReadStreamPart(@Mocked ReadStreamPart part)
-      throws IOException, InterruptedException, ExecutionException {
+  public void sendPart_ReadStreamPart(@Mocked ReadStreamPart part) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    new Expectations() {
-      {
-        part.saveToWriteStream((WriteStream<Buffer>) any);
-        result = future;
+    new MockUp<PumpFromPart>() {
+      @Mock
+      CompletableFuture<Void> toWriteStream(WriteStream<Buffer> writeStream) {
+        return future;
       }
     };
 
@@ -409,7 +405,7 @@ public void clearPartResource_deleteFile() throws IOException {
     FilePart part = new FilePart(null, file).setDeleteAfterFinished(true);
 
     Assert.assertTrue(file.exists());
-    response.clearPartResource(part, part.getInputStream());
+    DownloadUtils.clearPartResource(part);
     Assert.assertFalse(file.exists());
   }
 
@@ -420,7 +416,7 @@ public void clearPartResource_notDeleteFile() throws IOException {
     FilePart part = new FilePart(null, file);
 
     Assert.assertTrue(file.exists());
-    response.clearPartResource(part, part.getInputStream());
+    DownloadUtils.clearPartResource(part);
     Assert.assertTrue(file.exists());
 
     file.delete();
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
new file mode 100644
index 000000000..96899e85a
--- /dev/null
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.ExecutionException;
+
+import javax.servlet.http.Part;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.servicecomb.foundation.common.part.InputStreamPart;
+import org.apache.servicecomb.foundation.vertx.stream.InputStreamToReadStream.ReadResult;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.vertx.core.Context;
+import io.vertx.core.Future;
+import io.vertx.core.impl.SyncContext;
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+
+public class TestPumpFromPart {
+  String src = RandomStringUtils.random(100);
+
+  boolean inputStreamClosed;
+
+  InputStream inputStream = new ByteArrayInputStream(src.getBytes()) {
+    @Override
+    public void close() throws IOException {
+      super.close();
+      inputStreamClosed = true;
+    }
+  };
+
+  Part part;
+
+  boolean outputStreamClosed;
+
+  BufferOutputStream outputStream;
+
+  IOException error = new IOException();
+
+  Context context = new SyncContext();
+
+  private void run(Context context, boolean closeOutput) throws Throwable {
+    inputStream.reset();
+    part = new InputStreamPart("name", inputStream);
+
+    outputStream = new BufferOutputStream() {
+      @Override
+      public void close() {
+        super.close();
+        outputStreamClosed = true;
+      }
+    };
+
+    new PumpFromPart(context, part).toOutputStream(outputStream, closeOutput).get();
+  }
+
+  public void do_pump_succ(Context context) throws Throwable {
+    run(context, true);
+
+    Assert.assertEquals(src, outputStream.getBuffer().toString());
+    Assert.assertTrue(inputStreamClosed);
+    Assert.assertTrue(outputStreamClosed);
+  }
+
+  @Test
+  public void pump_succ() throws Throwable {
+    do_pump_succ(null);
+    do_pump_succ(context);
+  }
+
+  public void do_pump_outputNotClose(Context context) throws Throwable {
+    run(context, false);
+
+    Assert.assertEquals(src, outputStream.getBuffer().toString());
+    Assert.assertFalse(outputStreamClosed);
+  }
+
+  @Test
+  public void pump_outputNotClose() throws Throwable {
+    do_pump_outputNotClose(null);
+    do_pump_outputNotClose(context);
+  }
+
+  public void pump_error(Context context) {
+    try {
+      run(context, true);
+      Assert.fail("must throw exception");
+    } catch (Throwable e) {
+      Assert.assertThat(e, Matchers.instanceOf(ExecutionException.class));
+      Assert.assertThat(e.getCause(), Matchers.sameInstance(error));
+    }
+
+    Assert.assertTrue(inputStreamClosed);
+    Assert.assertTrue(outputStreamClosed);
+  }
+
+  @Test
+  public void pump_read_error() throws IOException {
+    new MockUp<InputStreamToReadStream>() {
+      @Mock
+      void readInWorker(Future<ReadResult> future) {
+        future.fail(error);
+      }
+    };
+    new Expectations(IOUtils.class) {
+      {
+        IOUtils.copyLarge((InputStream) any, (OutputStream) any);
+        result = error;
+      }
+    };
+
+    pump_error(null);
+    pump_error(context);
+  }
+
+  @Test
+  public void pump_write_error() throws IOException {
+    new MockUp<BufferOutputStream>() {
+      @Mock
+      void write(byte[] b) throws IOException {
+        throw error;
+      }
+    };
+    new Expectations(IOUtils.class) {
+      {
+        IOUtils.copyLarge((InputStream) any, (OutputStream) any);
+        result = error;
+      }
+    };
+
+    pump_error(null);
+    pump_error(context);
+  }
+}
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
index 180c46f51..144ca0b7e 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
+++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
@@ -84,7 +84,7 @@ public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Except
     createRequest(ipPort, path);
     clientRequest.putHeader(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, invocation.getMicroserviceName());
     RestClientRequestImpl restClientRequest =
-        new RestClientRequestImpl(clientRequest, httpClientWithContext.context().owner(), asyncResp);
+        new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp);
     invocation.getHandlerContext().put(RestConst.INVOCATION_HANDLER_REQUESTCLIENT, restClientRequest);
 
     Buffer requestBodyBuffer = restClientRequest.getBodyBuffer();
diff --git a/transports/transport-rest/transport-rest-servlet/src/main/java/org/apache/servicecomb/transport/rest/servlet/RestServlet.java b/transports/transport-rest/transport-rest-servlet/src/main/java/org/apache/servicecomb/transport/rest/servlet/RestServlet.java
index b16c0cc8c..0fb338581 100644
--- a/transports/transport-rest/transport-rest-servlet/src/main/java/org/apache/servicecomb/transport/rest/servlet/RestServlet.java
+++ b/transports/transport-rest/transport-rest-servlet/src/main/java/org/apache/servicecomb/transport/rest/servlet/RestServlet.java
@@ -17,8 +17,6 @@
 
 package org.apache.servicecomb.transport.rest.servlet;
 
-import java.io.IOException;
-
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
@@ -45,8 +43,7 @@ public void init() throws ServletException {
   }
 
   @Override
-  public void service(final HttpServletRequest request,
-      final HttpServletResponse response) throws ServletException, IOException {
+  public void service(final HttpServletRequest request, final HttpServletResponse response) {
     servletRestServer.service(request, response);
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services