You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/04/23 02:26:16 UTC
[incubator-servicecomb-java-chassis] 02/05: [SCB-487]
ReadStreamPart support saveAsBytes/saveAsString/saveToFile
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit be790e6b559700faac6dc677848d586f1e0a3af0
Author: wujimin <wu...@huawei.com>
AuthorDate: Sun Apr 22 12:45:08 2018 +0800
[SCB-487] ReadStreamPart support saveAsBytes/saveAsString/saveToFile
---
.../foundation/vertx/http/ReadStreamPart.java | 103 +++++++++++++-
.../io/vertx/core/file/impl/AsyncFileUitls.java | 28 ++++
.../foundation/vertx/http/TestReadStreamPart.java | 158 ++++++++++++++++++++-
.../rest/client/http/RestClientInvocation.java | 2 +-
4 files changed, 285 insertions(+), 6 deletions(-)
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
index 4014609..ddc56c9 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
@@ -16,13 +16,26 @@
*/
package org.apache.servicecomb.foundation.vertx.http;
+import java.io.File;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import javax.servlet.http.Part;
+import javax.ws.rs.core.HttpHeaders;
+import org.apache.commons.lang.StringUtils;
+import org.apache.servicecomb.foundation.common.http.HttpUtils;
import org.apache.servicecomb.foundation.common.part.AbstractPart;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Context;
+import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.streams.Pump;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
@@ -36,9 +49,26 @@ import io.vertx.core.streams.WriteStream;
* {@link org.apache.servicecomb.foundation.vertx.http.VertxServerResponseToHttpServletResponse#sendPart(Part) VertxServerResponseToHttpServletResponse.sendPart}
*/
public class ReadStreamPart extends AbstractPart {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReadStreamPart.class);
+
+ private Context context;
+
private ReadStream<Buffer> readStream;
- public ReadStreamPart(ReadStream<Buffer> readStream) {
+ public ReadStreamPart(Context context, HttpClientResponse httpClientResponse) {
+ this(context, (ReadStream<Buffer>) httpClientResponse);
+
+ setSubmittedFileName(
+ HttpUtils.parseFileNameFromHeaderValue(httpClientResponse.getHeader(HttpHeaders.CONTENT_DISPOSITION)));
+
+ String contentType = httpClientResponse.getHeader(HttpHeaders.CONTENT_TYPE);
+ if (StringUtils.isNotEmpty(contentType)) {
+ this.contentType(contentType);
+ }
+ }
+
+ public ReadStreamPart(Context context, ReadStream<Buffer> readStream) {
+ this.context = context;
this.readStream = readStream;
readStream.pause();
@@ -60,4 +90,75 @@ public class ReadStreamPart extends AbstractPart {
return future;
}
+
+ public CompletableFuture<byte[]> saveAsBytes() {
+ return saveAs(buf -> {
+ return buf.getBytes();
+ });
+ }
+
+ public CompletableFuture<String> saveAsString() {
+ return saveAs(buf -> {
+ return buf.toString();
+ });
+ }
+
+ public <T> CompletableFuture<T> saveAs(Function<Buffer, T> converter) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ Buffer buffer = Buffer.buffer();
+
+ readStream.exceptionHandler(future::completeExceptionally);
+ readStream.handler(buffer::appendBuffer);
+ readStream.endHandler(v -> {
+ future.complete(converter.apply(buffer));
+ });
+ readStream.resume();
+
+ return future;
+ }
+
+ public CompletableFuture<File> saveToFile(String fileName) {
+ File file = new File(fileName);
+ file.getParentFile().mkdirs();
+ OpenOptions openOptions = new OpenOptions().setCreateNew(true);
+ return saveToFile(file, openOptions);
+ }
+
+ public CompletableFuture<File> saveToFile(File file, OpenOptions openOptions) {
+ CompletableFuture<File> future = new CompletableFuture<>();
+
+ Vertx vertx = context.owner();
+ vertx.fileSystem().open(file.getAbsolutePath(), openOptions, ar -> {
+ onFileOpened(file, ar, future);
+ });
+
+ return future;
+ }
+
+ protected void onFileOpened(File file, AsyncResult<AsyncFile> ar, CompletableFuture<File> future) {
+ if (ar.failed()) {
+ future.completeExceptionally(ar.cause());
+ return;
+ }
+
+ AsyncFile asyncFile = ar.result();
+ CompletableFuture<Void> saveFuture = saveToWriteStream(asyncFile);
+ saveFuture.whenComplete((v, saveException) -> {
+ asyncFile.close(closeAr -> {
+ if (closeAr.failed()) {
+ LOGGER.error("Failed to close file {}.", file);
+ }
+
+ // whatever close success or failed
+ // will not affect to result
+ // result just only related to write
+ if (saveException == null) {
+ future.complete(file);
+ return;
+ }
+
+ future.completeExceptionally(saveException);
+ });
+ });
+ }
}
diff --git a/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java b/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java
new file mode 100644
index 0000000..a458f9b
--- /dev/null
+++ b/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.vertx.core.file.impl;
+
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.core.impl.ContextImpl;
+import io.vertx.core.impl.VertxInternal;
+
+public class AsyncFileUitls {
+ public static AsyncFile createAsyncFile(VertxInternal vertx, String path, OpenOptions options, ContextImpl context) {
+ return new AsyncFileImpl(vertx, path, options, context);
+ }
+}
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
index cffd964..0f9e730 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
@@ -17,10 +17,15 @@
package org.apache.servicecomb.foundation.vertx.http;
import java.io.ByteArrayInputStream;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
+import javax.ws.rs.core.HttpHeaders;
+
+import org.apache.commons.io.FileUtils;
import org.apache.servicecomb.foundation.vertx.stream.InputStreamToReadStream;
import org.hamcrest.Matchers;
import org.junit.Assert;
@@ -30,18 +35,35 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import io.vertx.core.AsyncResult;
+import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import io.vertx.core.file.FileSystemException;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.core.file.impl.AsyncFileUitls;
+import io.vertx.core.file.impl.FileSystemImpl;
+import io.vertx.core.file.impl.WindowsFileSystem;
+import io.vertx.core.http.HttpClientResponse;
+import io.vertx.core.impl.ContextImpl;
+import io.vertx.core.impl.EventLoopContext;
+import io.vertx.core.impl.Utils;
+import io.vertx.core.impl.VertxInternal;
import io.vertx.core.streams.WriteStream;
+import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
public class TestReadStreamPart {
@Mocked
- Vertx vertx;
+ VertxInternal vertx;
+
+ // @Mocked
+ ContextImpl context;
String src = "src";
@@ -54,14 +76,31 @@ public class TestReadStreamPart {
@Rule
public ExpectedException expectedException = ExpectedException.none();
+ FileSystem fileSystem;
+
+ protected FileSystem getFileSystem() {
+ return Utils.isWindows() ? new WindowsFileSystem(vertx) : new FileSystemImpl(vertx);
+ }
@Before
public void setup() {
- readStream = new InputStreamToReadStream(vertx, inputStream);
- part = new ReadStreamPart(readStream);
-
new MockUp<Vertx>(vertx) {
@Mock
+ FileSystem fileSystem() {
+ return fileSystem;
+ }
+
+ @Mock
+ ContextImpl getContext() {
+ return context;
+ }
+
+ @Mock
+ ContextImpl getOrCreateContext() {
+ return context;
+ }
+
+ @Mock
<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
Handler<AsyncResult<T>> resultHandler) {
Future<T> future = Future.future();
@@ -69,6 +108,78 @@ public class TestReadStreamPart {
future.setHandler(resultHandler);
}
};
+
+ context = new EventLoopContext(vertx, null, null, null, "id", null, null);
+ new MockUp<Context>(context) {
+ @Mock
+ Vertx owner() {
+ return vertx;
+ }
+
+ @Mock
+ void runOnContext(Handler<Void> task) {
+ task.handle(null);
+ }
+
+ @Mock
+ <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
+ Future<T> future = Future.future();
+ blockingCodeHandler.handle(future);
+ future.setHandler(resultHandler);
+ }
+ };
+
+ fileSystem = getFileSystem();
+
+ readStream = new InputStreamToReadStream(vertx, inputStream);
+ part = new ReadStreamPart(context, readStream);
+
+ new MockUp<FileSystem>(fileSystem) {
+ @Mock
+ FileSystem open(String path, OpenOptions options, Handler<AsyncResult<AsyncFile>> handler) {
+ try {
+ AsyncFile asyncFile = AsyncFileUitls.createAsyncFile(vertx, path, options, context);
+ handler.handle(Future.succeededFuture(asyncFile));
+ } catch (Exception e) {
+ handler.handle(Future.failedFuture(e));
+ }
+ return fileSystem;
+ }
+ };
+ }
+
+ @Test
+ public void constructFromHttpClientResponse_noContentType(@Mocked HttpClientResponse httpClientResponse) {
+ new Expectations() {
+ {
+ httpClientResponse.getHeader(HttpHeaders.CONTENT_DISPOSITION);
+ result = "xx;filename=name.txt";
+ httpClientResponse.getHeader(HttpHeaders.CONTENT_TYPE);
+ result = null;
+ }
+ };
+
+ part = new ReadStreamPart(context, httpClientResponse);
+
+ Assert.assertEquals("name.txt", part.getSubmittedFileName());
+ Assert.assertEquals("text/plain", part.getContentType());
+ }
+
+ @Test
+ public void constructFromHttpClientResponse_hasContentType(@Mocked HttpClientResponse httpClientResponse) {
+ new Expectations() {
+ {
+ httpClientResponse.getHeader(HttpHeaders.CONTENT_DISPOSITION);
+ result = "xx;filename=name.txt";
+ httpClientResponse.getHeader(HttpHeaders.CONTENT_TYPE);
+ result = "type";
+ }
+ };
+
+ part = new ReadStreamPart(context, httpClientResponse);
+
+ Assert.assertEquals("name.txt", part.getSubmittedFileName());
+ Assert.assertEquals("type", part.getContentType());
}
@Test
@@ -128,4 +239,43 @@ public class TestReadStreamPart {
part.saveToWriteStream(writeStream).get();
}
+
+ @Test
+ public void saveAsBytes() throws InterruptedException, ExecutionException {
+ Assert.assertArrayEquals(src.getBytes(), part.saveAsBytes().get());
+ }
+
+ @Test
+ public void saveAsString() throws InterruptedException, ExecutionException {
+ Assert.assertEquals(src, part.saveAsString().get());
+ }
+
+ @Test
+ public void saveToFile() throws InterruptedException, ExecutionException, IOException {
+ File dir = new File("target/notExist-" + UUID.randomUUID().toString());
+ File file = new File(dir, "a.txt");
+
+ Assert.assertFalse(dir.exists());
+
+ part.saveToFile(file.getAbsolutePath()).get();
+
+ Assert.assertEquals(src, FileUtils.readFileToString(file));
+
+ FileUtils.forceDelete(dir);
+ Assert.assertFalse(dir.exists());
+ }
+
+ @Test
+ public void saveToFile_notExist_notCreate() throws InterruptedException, ExecutionException, IOException {
+ File dir = new File("target/notExist-" + UUID.randomUUID().toString());
+ File file = new File(dir, "a.txt");
+
+ Assert.assertFalse(dir.exists());
+
+ expectedException.expect(ExecutionException.class);
+ expectedException.expectCause(Matchers.instanceOf(FileSystemException.class));
+
+ OpenOptions openOptions = new OpenOptions().setCreateNew(false);
+ part.saveToFile(file, openOptions).get();
+ }
}
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
index 9a586a1..db148dd 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
+++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
@@ -155,7 +155,7 @@ public class RestClientInvocation {
if (HttpStatus.isSuccess(clientResponse.statusCode())
&& Part.class.equals(invocation.getOperationMeta().getMethod().getReturnType())) {
- ReadStreamPart part = new ReadStreamPart(httpClientResponse);
+ ReadStreamPart part = new ReadStreamPart(httpClientWithContext.context(), httpClientResponse);
invocation.getHandlerContext().put(RestConst.READ_STREAM_PART, part);
processResponseBody(null);
return;
--
To stop receiving notification emails like this one, please contact
liubao@apache.org.