You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/04/22 08:32:28 UTC

[incubator-servicecomb-java-chassis] 01/09: [SCB-486] create ReadStreamPart to support route download stream in edge

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit 840d3a2c100db6ecd6e13526d359e9bc1098e21a
Author: wujimin <wu...@huawei.com>
AuthorDate: Sat Apr 21 12:45:16 2018 +0800

    [SCB-486] create ReadStreamPart to support route download stream in edge
---
 .../foundation/vertx/http/ReadStreamPart.java      |  63 ++++++++++
 .../VertxServerResponseToHttpServletResponse.java  |  15 ++-
 .../foundation/vertx/http/TestReadStreamPart.java  | 131 +++++++++++++++++++++
 ...stVertxServerResponseToHttpServletResponse.java |  16 +++
 4 files changed, 219 insertions(+), 6 deletions(-)

diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
new file mode 100644
index 0000000..4014609
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.http;
+
+import java.util.concurrent.CompletableFuture;
+
+import javax.servlet.http.Part;
+
+import org.apache.servicecomb.foundation.common.part.AbstractPart;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.Pump;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+
+/**
+ * this is not a really part type, all method extend from AbstractPart is undefined except:<br>
+ * 1.getContentType<br>
+ * 2.getSubmittedFileName<br>
+ * extend from AbstractPart just because want to make it be Part type,
+ * so that can be sent by 
+ * {@link org.apache.servicecomb.foundation.vertx.http.VertxServerResponseToHttpServletResponse#sendPart(Part) VertxServerResponseToHttpServletResponse.sendPart}
+ */
+public class ReadStreamPart extends AbstractPart {
+  private ReadStream<Buffer> readStream;
+
+  public ReadStreamPart(ReadStream<Buffer> readStream) {
+    this.readStream = readStream;
+
+    readStream.pause();
+  }
+
+  public CompletableFuture<Void> saveToWriteStream(WriteStream<Buffer> writeStream) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+
+    writeStream.exceptionHandler(future::completeExceptionally);
+    readStream.exceptionHandler(future::completeExceptionally);
+    readStream.endHandler(future::complete);
+
+    // if readStream(HttpClientResponse) and writeStream(HttpServerResponse)
+    // belongs to difference eventloop
+    // maybe will cause deadlock
+    // if happened, vertx will print deadlock stacks
+    Pump.pump(readStream, writeStream).start();
+    readStream.resume();
+
+    return future;
+  }
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
index d467058..8f55886 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
@@ -132,23 +132,26 @@ public class VertxServerResponseToHttpServletResponse extends AbstractHttpServle
 
   @Override
   public CompletableFuture<Void> sendPart(Part part) {
-    CompletableFuture<Void> future = new CompletableFuture<Void>();
-
     prepareSendPartHeader(part);
 
+    if (ReadStreamPart.class.isInstance(part)) {
+      return ((ReadStreamPart) part).saveToWriteStream(this.serverResponse);
+    }
+
+    CompletableFuture<Void> future = new CompletableFuture<Void>();
     try {
       InputStream is = part.getInputStream();
       context.runOnContext(v -> {
-        InputStreamToReadStream aa = new InputStreamToReadStream(context.owner(), is);
-        aa.exceptionHandler(t -> {
+        InputStreamToReadStream inputStreamToReadStream = new InputStreamToReadStream(context.owner(), is);
+        inputStreamToReadStream.exceptionHandler(t -> {
           clearPartResource(part, is);
           future.completeExceptionally(t);
         });
-        aa.endHandler(V -> {
+        inputStreamToReadStream.endHandler(V -> {
           clearPartResource(part, is);
           future.complete(null);
         });
-        Pump.pump(aa, serverResponse).start();
+        Pump.pump(inputStreamToReadStream, serverResponse).start();
       });
     } catch (IOException e) {
       future.completeExceptionally(e);
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
new file mode 100644
index 0000000..cffd964
--- /dev/null
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.http;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.servicecomb.foundation.vertx.stream.InputStreamToReadStream;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.WriteStream;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+public class TestReadStreamPart {
+  @Mocked
+  Vertx vertx;
+
+  String src = "src";
+
+  InputStreamToReadStream readStream;
+
+  ReadStreamPart part;
+
+  InputStream inputStream = new ByteArrayInputStream(src.getBytes());
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+
+  @Before
+  public void setup() {
+    readStream = new InputStreamToReadStream(vertx, inputStream);
+    part = new ReadStreamPart(readStream);
+
+    new MockUp<Vertx>(vertx) {
+      @Mock
+      <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
+          Handler<AsyncResult<T>> resultHandler) {
+        Future<T> future = Future.future();
+        blockingCodeHandler.handle(future);
+        future.setHandler(resultHandler);
+      }
+    };
+  }
+
+  @Test
+  public void saveToWriteStream() throws InterruptedException, ExecutionException {
+    Buffer buf = Buffer.buffer();
+    WriteStream<Buffer> writeStream = new MockUp<WriteStream<Buffer>>() {
+      @Mock
+      WriteStream<Buffer> write(Buffer data) {
+        buf.appendBuffer(data);
+        return null;
+      }
+    }.getMockInstance();
+
+    part.saveToWriteStream(writeStream).get();
+
+    Assert.assertEquals(src, buf.toString());
+  }
+
+  @Test
+  public void saveToWriteStream_writeException() throws InterruptedException, ExecutionException {
+    Error error = new Error();
+    WriteStream<Buffer> writeStream = new MockUp<WriteStream<Buffer>>() {
+      Handler<Throwable> exceptionHandler;
+
+      @Mock
+      WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
+        this.exceptionHandler = handler;
+        return null;
+      }
+
+      @Mock
+      WriteStream<Buffer> write(Buffer data) {
+        exceptionHandler.handle(error);
+        return null;
+      }
+    }.getMockInstance();
+
+    expectedException.expect(ExecutionException.class);
+    expectedException.expectCause(Matchers.sameInstance(error));
+
+    part.saveToWriteStream(writeStream).get();
+  }
+
+  @Test
+  public void saveToWrite_readException(@Mocked WriteStream<Buffer> writeStream)
+      throws InterruptedException, ExecutionException {
+    Error error = new Error();
+    new MockUp<InputStream>(inputStream) {
+      @Mock
+      int read(byte b[]) throws IOException {
+        throw error;
+      }
+    };
+
+    expectedException.expect(ExecutionException.class);
+    expectedException.expectCause(Matchers.sameInstance(error));
+
+    part.saveToWriteStream(writeStream).get();
+  }
+}
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
index fd6202c..f66dd1b 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
@@ -47,6 +47,7 @@ import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpServerResponse;
 import io.vertx.core.impl.VertxImpl;
+import io.vertx.core.streams.WriteStream;
 import mockit.Deencapsulation;
 import mockit.Expectations;
 import mockit.Mock;
@@ -366,6 +367,21 @@ public class TestVertxServerResponseToHttpServletResponse {
     future.get();
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void sendPart_ReadStreamPart(@Mocked ReadStreamPart part)
+      throws IOException, InterruptedException, ExecutionException {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    new Expectations() {
+      {
+        part.saveToWriteStream((WriteStream<Buffer>) any);
+        result = future;
+      }
+    };
+
+    Assert.assertSame(future, response.sendPart(part));
+  }
+
   @Test
   public void sendPart_succ(@Mocked Part part, @Mocked InputStream inputStream)
       throws IOException, InterruptedException, ExecutionException {

-- 
To stop receiving notification emails like this one, please contact
liubao@apache.org.