You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2020/01/14 06:28:53 UTC

[servicecomb-java-chassis] branch master updated: [SCB-1717] Fix returning nothing when file upload execption occurs

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b39906  [SCB-1717] Fix returning nothing when file upload execption occurs
6b39906 is described below

commit 6b39906aed6bd8d34ec3449af6c0ce90344d5c65
Author: AngLi2 <li...@qq.com>
AuthorDate: Sat Jan 11 16:03:09 2020 +0800

    [SCB-1717] Fix returning nothing when file upload execption occurs
---
 .../common/rest/codec/param/RestClientRequestImpl.java        | 11 ++++++++++-
 .../servicecomb/foundation/vertx/http/ReadStreamPart.java     |  2 +-
 .../vertx/http/VertxServerResponseToHttpServletResponse.java  |  6 ++----
 .../servicecomb/foundation/vertx/stream/PumpCommon.java       |  7 ++++++-
 .../servicecomb/foundation/vertx/stream/PumpFromPart.java     |  7 ++++---
 .../http/TestVertxServerResponseToHttpServletResponse.java    |  2 +-
 .../transport/rest/client/http/RestClientInvocation.java      |  7 +++++--
 7 files changed, 29 insertions(+), 13 deletions(-)

diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/param/RestClientRequestImpl.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/param/RestClientRequestImpl.java
index d9e4f62..d5a43c3 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/param/RestClientRequestImpl.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/param/RestClientRequestImpl.java
@@ -45,6 +45,7 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 
 import io.vertx.core.Context;
+import io.vertx.core.Handler;
 import io.vertx.core.MultiMap;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpClientRequest;
@@ -67,10 +68,18 @@ public class RestClientRequestImpl implements RestClientRequest {
 
   protected Buffer bodyBuffer;
 
+  private Handler<Throwable> throwableHandler;
+
   public RestClientRequestImpl(HttpClientRequest request, Context context, AsyncResponse asyncResp) {
+    this(request, context, asyncResp, null);
+  }
+
+  public RestClientRequestImpl(HttpClientRequest request, Context context, AsyncResponse asyncResp,
+                               Handler<Throwable> throwableHandler) {
     this.context = context;
     this.asyncResp = asyncResp;
     this.request = request;
+    this.throwableHandler = throwableHandler;
   }
 
   @Override
@@ -195,7 +204,7 @@ public class RestClientRequestImpl implements RestClientRequest {
     Buffer fileHeader = fileBoundaryInfo(boundary, name, part);
     request.write(fileHeader);
 
-    new PumpFromPart(context, part).toWriteStream(request).whenComplete((v, e) -> {
+    new PumpFromPart(context, part).toWriteStream(request, throwableHandler).whenComplete((v, e) -> {
       if (e != null) {
         LOGGER.debug("Failed to sending file [{}:{}].", name, filename, e);
         asyncResp.consumerFail(e);
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
index caf2887..3d1dcd6 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
@@ -91,7 +91,7 @@ public class ReadStreamPart extends AbstractPart {
    * so the return future only means finished read from readStream.
    */
   public CompletableFuture<Void> saveToWriteStream(WriteStream<Buffer> writeStream) {
-    return new PumpCommon().pump(context, readStream, writeStream);
+    return new PumpCommon().pump(context, readStream, writeStream, null);
   }
 
   public CompletableFuture<byte[]> saveAsBytes() {
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
index 0d12c65..539591c 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
@@ -108,9 +108,7 @@ public class VertxServerResponseToHttpServletResponse extends AbstractHttpServle
       return;
     }
 
-    context.runOnContext(V -> {
-      internalFlushBuffer();
-    });
+    context.runOnContext(V -> internalFlushBuffer());
   }
 
   public void internalFlushBuffer() {
@@ -126,7 +124,7 @@ public class VertxServerResponseToHttpServletResponse extends AbstractHttpServle
   public CompletableFuture<Void> sendPart(Part part) {
     DownloadUtils.prepareDownloadHeader(this, part);
 
-    return new PumpFromPart(context, part).toWriteStream(serverResponse);
+    return new PumpFromPart(context, part).toWriteStream(serverResponse, null);
   }
 
   @Override
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
index 77334b9..5526bf0 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpCommon.java
@@ -18,6 +18,7 @@ package org.apache.servicecomb.foundation.vertx.stream;
 
 import java.util.concurrent.CompletableFuture;
 
+import io.vertx.core.Handler;
 import org.apache.servicecomb.foundation.common.io.AsyncCloseable;
 
 import io.vertx.core.Context;
@@ -39,7 +40,8 @@ public class PumpCommon {
    * <p>  if writeStream is not AsyncCloseable, future only means read complete
    */
   @SuppressWarnings("unchecked")
-  public CompletableFuture<Void> pump(Context context, ReadStream<Buffer> readStream, WriteStream<Buffer> writeStream) {
+  public CompletableFuture<Void> pump(Context context, ReadStream<Buffer> readStream, WriteStream<Buffer> writeStream,
+          Handler<Throwable> throwableHandler) {
     CompletableFuture<Void> readFuture = new CompletableFuture<>();
 
     writeStream.exceptionHandler(e -> {
@@ -54,6 +56,9 @@ public class PumpCommon {
         // so can only close the connection.
         ((HttpClientResponse) readStream).request().connection().close();
       }
+      if(throwableHandler != null){
+        throwableHandler.handle(e);
+      }
       readFuture.completeExceptionally(e);
     });
     readStream.exceptionHandler(readFuture::completeExceptionally);
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java
index 348321c..d76e7f2 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFromPart.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
 
 import javax.servlet.http.Part;
 
+import io.vertx.core.Handler;
 import org.apache.commons.io.IOUtils;
 import org.apache.servicecomb.foundation.vertx.http.DownloadUtils;
 import org.apache.servicecomb.foundation.vertx.http.ReadStreamPart;
@@ -62,9 +63,9 @@ public class PumpFromPart {
     return future;
   }
 
-  public CompletableFuture<Void> toWriteStream(WriteStream<Buffer> writeStream) {
+  public CompletableFuture<Void> toWriteStream(WriteStream<Buffer> writeStream, Handler<Throwable> throwableHandler) {
     return prepareReadStream()
-        .thenCompose(readStream -> new PumpCommon().pump(context, readStream, writeStream))
+        .thenCompose(readStream -> new PumpCommon().pump(context, readStream, writeStream, throwableHandler))
         .whenComplete((v, e) -> DownloadUtils.clearPartResource(part));
   }
 
@@ -79,7 +80,7 @@ public class PumpFromPart {
   private CompletableFuture<Void> toOutputStreamAsync(OutputStream outputStream, boolean autoCloseOutputStream) {
     OutputStreamToWriteStream outputStreamToWriteStream = new OutputStreamToWriteStream(context, outputStream,
         autoCloseOutputStream);
-    return toWriteStream(outputStreamToWriteStream);
+    return toWriteStream(outputStreamToWriteStream, null);
   }
 
   // DO NOT use a mocked sync context to unify the pump logic
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
index e82c662..66150d6 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
@@ -372,7 +372,7 @@ public class TestVertxServerResponseToHttpServletResponse {
     CompletableFuture<Void> future = new CompletableFuture<>();
     new MockUp<PumpFromPart>() {
       @Mock
-      CompletableFuture<Void> toWriteStream(WriteStream<Buffer> writeStream) {
+      CompletableFuture<Void> toWriteStream(WriteStream<Buffer> writeStream, Handler<Throwable> throwableHandler) {
         return future;
       }
     };
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
index 995575a..91db649 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
+++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
@@ -45,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.StringUtils;
 
+import io.vertx.core.Handler;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpClientRequest;
 import io.vertx.core.http.HttpClientResponse;
@@ -76,6 +77,8 @@ public class RestClientInvocation {
 
   private HttpClientResponse clientResponse;
 
+  private Handler<Throwable> throwableHandler = e -> fail((ConnectionBase) clientRequest.connection(), e);
+
   public RestClientInvocation(HttpClientWithContext httpClientWithContext, List<HttpClientFilter> httpClientFilters) {
     this.httpClientWithContext = httpClientWithContext;
     this.httpClientFilters = httpClientFilters;
@@ -94,7 +97,7 @@ public class RestClientInvocation {
     createRequest(ipPort, path);
     clientRequest.putHeader(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, invocation.getMicroserviceName());
     RestClientRequestImpl restClientRequest =
-        new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp);
+        new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp, throwableHandler);
     invocation.getHandlerContext().put(RestConst.INVOCATION_HANDLER_REQUESTCLIENT, restClientRequest);
 
     Buffer requestBodyBuffer = restClientRequest.getBodyBuffer();
@@ -109,7 +112,7 @@ public class RestClientInvocation {
     clientRequest.exceptionHandler(e -> {
       LOGGER.error(invocation.getMarker(), "Failed to send request, local:{}, remote:{}.",
           getLocalAddress(), ipPort.getSocketAddress(), e);
-      fail((ConnectionBase) clientRequest.connection(), e);
+      throwableHandler.handle(e);
     });
     clientRequest.connectionHandler(connection -> {
       LOGGER.debug("http connection connected, local:{}, remote:{}.",