You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/04/22 08:32:28 UTC
[incubator-servicecomb-java-chassis] 01/09: [SCB-486] create
ReadStreamPart to support route download stream in edge
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 840d3a2c100db6ecd6e13526d359e9bc1098e21a
Author: wujimin <wu...@huawei.com>
AuthorDate: Sat Apr 21 12:45:16 2018 +0800
[SCB-486] create ReadStreamPart to support route download stream in edge
---
.../foundation/vertx/http/ReadStreamPart.java | 63 ++++++++++
.../VertxServerResponseToHttpServletResponse.java | 15 ++-
.../foundation/vertx/http/TestReadStreamPart.java | 131 +++++++++++++++++++++
...stVertxServerResponseToHttpServletResponse.java | 16 +++
4 files changed, 219 insertions(+), 6 deletions(-)
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
new file mode 100644
index 0000000..4014609
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.http;
+
+import java.util.concurrent.CompletableFuture;
+
+import javax.servlet.http.Part;
+
+import org.apache.servicecomb.foundation.common.part.AbstractPart;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.Pump;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+
+/**
+ * this is not a really part type, all method extend from AbstractPart is undefined except:<br>
+ * 1.getContentType<br>
+ * 2.getSubmittedFileName<br>
+ * extend from AbstractPart just because want to make it be Part type,
+ * so that can be sent by
+ * {@link org.apache.servicecomb.foundation.vertx.http.VertxServerResponseToHttpServletResponse#sendPart(Part) VertxServerResponseToHttpServletResponse.sendPart}
+ */
+public class ReadStreamPart extends AbstractPart {
+ private ReadStream<Buffer> readStream;
+
+ public ReadStreamPart(ReadStream<Buffer> readStream) {
+ this.readStream = readStream;
+
+ readStream.pause();
+ }
+
+ public CompletableFuture<Void> saveToWriteStream(WriteStream<Buffer> writeStream) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ writeStream.exceptionHandler(future::completeExceptionally);
+ readStream.exceptionHandler(future::completeExceptionally);
+ readStream.endHandler(future::complete);
+
+ // if readStream(HttpClientResponse) and writeStream(HttpServerResponse)
+ // belongs to difference eventloop
+ // maybe will cause deadlock
+ // if happened, vertx will print deadlock stacks
+ Pump.pump(readStream, writeStream).start();
+ readStream.resume();
+
+ return future;
+ }
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
index d467058..8f55886 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java
@@ -132,23 +132,26 @@ public class VertxServerResponseToHttpServletResponse extends AbstractHttpServle
@Override
public CompletableFuture<Void> sendPart(Part part) {
- CompletableFuture<Void> future = new CompletableFuture<Void>();
-
prepareSendPartHeader(part);
+ if (ReadStreamPart.class.isInstance(part)) {
+ return ((ReadStreamPart) part).saveToWriteStream(this.serverResponse);
+ }
+
+ CompletableFuture<Void> future = new CompletableFuture<Void>();
try {
InputStream is = part.getInputStream();
context.runOnContext(v -> {
- InputStreamToReadStream aa = new InputStreamToReadStream(context.owner(), is);
- aa.exceptionHandler(t -> {
+ InputStreamToReadStream inputStreamToReadStream = new InputStreamToReadStream(context.owner(), is);
+ inputStreamToReadStream.exceptionHandler(t -> {
clearPartResource(part, is);
future.completeExceptionally(t);
});
- aa.endHandler(V -> {
+ inputStreamToReadStream.endHandler(V -> {
clearPartResource(part, is);
future.complete(null);
});
- Pump.pump(aa, serverResponse).start();
+ Pump.pump(inputStreamToReadStream, serverResponse).start();
});
} catch (IOException e) {
future.completeExceptionally(e);
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
new file mode 100644
index 0000000..cffd964
--- /dev/null
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.http;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.servicecomb.foundation.vertx.stream.InputStreamToReadStream;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.WriteStream;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+public class TestReadStreamPart {
+ @Mocked
+ Vertx vertx;
+
+ String src = "src";
+
+ InputStreamToReadStream readStream;
+
+ ReadStreamPart part;
+
+ InputStream inputStream = new ByteArrayInputStream(src.getBytes());
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+
+ @Before
+ public void setup() {
+ readStream = new InputStreamToReadStream(vertx, inputStream);
+ part = new ReadStreamPart(readStream);
+
+ new MockUp<Vertx>(vertx) {
+ @Mock
+ <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
+ Handler<AsyncResult<T>> resultHandler) {
+ Future<T> future = Future.future();
+ blockingCodeHandler.handle(future);
+ future.setHandler(resultHandler);
+ }
+ };
+ }
+
+ @Test
+ public void saveToWriteStream() throws InterruptedException, ExecutionException {
+ Buffer buf = Buffer.buffer();
+ WriteStream<Buffer> writeStream = new MockUp<WriteStream<Buffer>>() {
+ @Mock
+ WriteStream<Buffer> write(Buffer data) {
+ buf.appendBuffer(data);
+ return null;
+ }
+ }.getMockInstance();
+
+ part.saveToWriteStream(writeStream).get();
+
+ Assert.assertEquals(src, buf.toString());
+ }
+
+ @Test
+ public void saveToWriteStream_writeException() throws InterruptedException, ExecutionException {
+ Error error = new Error();
+ WriteStream<Buffer> writeStream = new MockUp<WriteStream<Buffer>>() {
+ Handler<Throwable> exceptionHandler;
+
+ @Mock
+ WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
+ this.exceptionHandler = handler;
+ return null;
+ }
+
+ @Mock
+ WriteStream<Buffer> write(Buffer data) {
+ exceptionHandler.handle(error);
+ return null;
+ }
+ }.getMockInstance();
+
+ expectedException.expect(ExecutionException.class);
+ expectedException.expectCause(Matchers.sameInstance(error));
+
+ part.saveToWriteStream(writeStream).get();
+ }
+
+ @Test
+ public void saveToWrite_readException(@Mocked WriteStream<Buffer> writeStream)
+ throws InterruptedException, ExecutionException {
+ Error error = new Error();
+ new MockUp<InputStream>(inputStream) {
+ @Mock
+ int read(byte b[]) throws IOException {
+ throw error;
+ }
+ };
+
+ expectedException.expect(ExecutionException.class);
+ expectedException.expectCause(Matchers.sameInstance(error));
+
+ part.saveToWriteStream(writeStream).get();
+ }
+}
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
index fd6202c..f66dd1b 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
@@ -47,6 +47,7 @@ import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.impl.VertxImpl;
+import io.vertx.core.streams.WriteStream;
import mockit.Deencapsulation;
import mockit.Expectations;
import mockit.Mock;
@@ -366,6 +367,21 @@ public class TestVertxServerResponseToHttpServletResponse {
future.get();
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void sendPart_ReadStreamPart(@Mocked ReadStreamPart part)
+ throws IOException, InterruptedException, ExecutionException {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ new Expectations() {
+ {
+ part.saveToWriteStream((WriteStream<Buffer>) any);
+ result = future;
+ }
+ };
+
+ Assert.assertSame(future, response.sendPart(part));
+ }
+
@Test
public void sendPart_succ(@Mocked Part part, @Mocked InputStream inputStream)
throws IOException, InterruptedException, ExecutionException {
--
To stop receiving notification emails like this one, please contact
liubao@apache.org.