You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/04/23 02:26:16 UTC

[incubator-servicecomb-java-chassis] 02/05: [SCB-487] ReadStreamPart support saveAsBytes/saveAsString/saveToFile

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit be790e6b559700faac6dc677848d586f1e0a3af0
Author: wujimin <wu...@huawei.com>
AuthorDate: Sun Apr 22 12:45:08 2018 +0800

    [SCB-487] ReadStreamPart support saveAsBytes/saveAsString/saveToFile
---
 .../foundation/vertx/http/ReadStreamPart.java      | 103 +++++++++++++-
 .../io/vertx/core/file/impl/AsyncFileUitls.java    |  28 ++++
 .../foundation/vertx/http/TestReadStreamPart.java  | 158 ++++++++++++++++++++-
 .../rest/client/http/RestClientInvocation.java     |   2 +-
 4 files changed, 285 insertions(+), 6 deletions(-)

diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
index 4014609..ddc56c9 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/ReadStreamPart.java
@@ -16,13 +16,26 @@
  */
 package org.apache.servicecomb.foundation.vertx.http;
 
+import java.io.File;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 import javax.servlet.http.Part;
+import javax.ws.rs.core.HttpHeaders;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.servicecomb.foundation.common.http.HttpUtils;
 import org.apache.servicecomb.foundation.common.part.AbstractPart;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Context;
+import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.core.http.HttpClientResponse;
 import io.vertx.core.streams.Pump;
 import io.vertx.core.streams.ReadStream;
 import io.vertx.core.streams.WriteStream;
@@ -36,9 +49,26 @@ import io.vertx.core.streams.WriteStream;
  * {@link org.apache.servicecomb.foundation.vertx.http.VertxServerResponseToHttpServletResponse#sendPart(Part) VertxServerResponseToHttpServletResponse.sendPart}
  */
 public class ReadStreamPart extends AbstractPart {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReadStreamPart.class);
+
+  private Context context;
+
   private ReadStream<Buffer> readStream;
 
-  public ReadStreamPart(ReadStream<Buffer> readStream) {
+  public ReadStreamPart(Context context, HttpClientResponse httpClientResponse) {
+    this(context, (ReadStream<Buffer>) httpClientResponse);
+
+    setSubmittedFileName(
+        HttpUtils.parseFileNameFromHeaderValue(httpClientResponse.getHeader(HttpHeaders.CONTENT_DISPOSITION)));
+
+    String contentType = httpClientResponse.getHeader(HttpHeaders.CONTENT_TYPE);
+    if (StringUtils.isNotEmpty(contentType)) {
+      this.contentType(contentType);
+    }
+  }
+
+  public ReadStreamPart(Context context, ReadStream<Buffer> readStream) {
+    this.context = context;
     this.readStream = readStream;
 
     readStream.pause();
@@ -60,4 +90,75 @@ public class ReadStreamPart extends AbstractPart {
 
     return future;
   }
+
+  public CompletableFuture<byte[]> saveAsBytes() {
+    return saveAs(buf -> {
+      return buf.getBytes();
+    });
+  }
+
+  public CompletableFuture<String> saveAsString() {
+    return saveAs(buf -> {
+      return buf.toString();
+    });
+  }
+
+  public <T> CompletableFuture<T> saveAs(Function<Buffer, T> converter) {
+    CompletableFuture<T> future = new CompletableFuture<>();
+    Buffer buffer = Buffer.buffer();
+
+    readStream.exceptionHandler(future::completeExceptionally);
+    readStream.handler(buffer::appendBuffer);
+    readStream.endHandler(v -> {
+      future.complete(converter.apply(buffer));
+    });
+    readStream.resume();
+
+    return future;
+  }
+
+  public CompletableFuture<File> saveToFile(String fileName) {
+    File file = new File(fileName);
+    file.getParentFile().mkdirs();
+    OpenOptions openOptions = new OpenOptions().setCreateNew(true);
+    return saveToFile(file, openOptions);
+  }
+
+  public CompletableFuture<File> saveToFile(File file, OpenOptions openOptions) {
+    CompletableFuture<File> future = new CompletableFuture<>();
+
+    Vertx vertx = context.owner();
+    vertx.fileSystem().open(file.getAbsolutePath(), openOptions, ar -> {
+      onFileOpened(file, ar, future);
+    });
+
+    return future;
+  }
+
+  protected void onFileOpened(File file, AsyncResult<AsyncFile> ar, CompletableFuture<File> future) {
+    if (ar.failed()) {
+      future.completeExceptionally(ar.cause());
+      return;
+    }
+
+    AsyncFile asyncFile = ar.result();
+    CompletableFuture<Void> saveFuture = saveToWriteStream(asyncFile);
+    saveFuture.whenComplete((v, saveException) -> {
+      asyncFile.close(closeAr -> {
+        if (closeAr.failed()) {
+          LOGGER.error("Failed to close file {}.", file);
+        }
+
+        // whatever close success or failed
+        // will not affect to result
+        // result just only related to write
+        if (saveException == null) {
+          future.complete(file);
+          return;
+        }
+
+        future.completeExceptionally(saveException);
+      });
+    });
+  }
 }
diff --git a/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java b/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java
new file mode 100644
index 0000000..a458f9b
--- /dev/null
+++ b/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.vertx.core.file.impl;
+
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.core.impl.ContextImpl;
+import io.vertx.core.impl.VertxInternal;
+
+public class AsyncFileUitls {
+  public static AsyncFile createAsyncFile(VertxInternal vertx, String path, OpenOptions options, ContextImpl context) {
+    return new AsyncFileImpl(vertx, path, options, context);
+  }
+}
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
index cffd964..0f9e730 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
@@ -17,10 +17,15 @@
 package org.apache.servicecomb.foundation.vertx.http;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
+import javax.ws.rs.core.HttpHeaders;
+
+import org.apache.commons.io.FileUtils;
 import org.apache.servicecomb.foundation.vertx.stream.InputStreamToReadStream;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
@@ -30,18 +35,35 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import io.vertx.core.AsyncResult;
+import io.vertx.core.Context;
 import io.vertx.core.Future;
 import io.vertx.core.Handler;
 import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import io.vertx.core.file.FileSystemException;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.core.file.impl.AsyncFileUitls;
+import io.vertx.core.file.impl.FileSystemImpl;
+import io.vertx.core.file.impl.WindowsFileSystem;
+import io.vertx.core.http.HttpClientResponse;
+import io.vertx.core.impl.ContextImpl;
+import io.vertx.core.impl.EventLoopContext;
+import io.vertx.core.impl.Utils;
+import io.vertx.core.impl.VertxInternal;
 import io.vertx.core.streams.WriteStream;
+import mockit.Expectations;
 import mockit.Mock;
 import mockit.MockUp;
 import mockit.Mocked;
 
 public class TestReadStreamPart {
   @Mocked
-  Vertx vertx;
+  VertxInternal vertx;
+
+  //  @Mocked
+  ContextImpl context;
 
   String src = "src";
 
@@ -54,14 +76,31 @@ public class TestReadStreamPart {
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
+  FileSystem fileSystem;
+
+  protected FileSystem getFileSystem() {
+    return Utils.isWindows() ? new WindowsFileSystem(vertx) : new FileSystemImpl(vertx);
+  }
 
   @Before
   public void setup() {
-    readStream = new InputStreamToReadStream(vertx, inputStream);
-    part = new ReadStreamPart(readStream);
-
     new MockUp<Vertx>(vertx) {
       @Mock
+      FileSystem fileSystem() {
+        return fileSystem;
+      }
+
+      @Mock
+      ContextImpl getContext() {
+        return context;
+      }
+
+      @Mock
+      ContextImpl getOrCreateContext() {
+        return context;
+      }
+
+      @Mock
       <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
           Handler<AsyncResult<T>> resultHandler) {
         Future<T> future = Future.future();
@@ -69,6 +108,78 @@ public class TestReadStreamPart {
         future.setHandler(resultHandler);
       }
     };
+
+    context = new EventLoopContext(vertx, null, null, null, "id", null, null);
+    new MockUp<Context>(context) {
+      @Mock
+      Vertx owner() {
+        return vertx;
+      }
+
+      @Mock
+      void runOnContext(Handler<Void> task) {
+        task.handle(null);
+      }
+
+      @Mock
+      <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
+        Future<T> future = Future.future();
+        blockingCodeHandler.handle(future);
+        future.setHandler(resultHandler);
+      }
+    };
+
+    fileSystem = getFileSystem();
+
+    readStream = new InputStreamToReadStream(vertx, inputStream);
+    part = new ReadStreamPart(context, readStream);
+
+    new MockUp<FileSystem>(fileSystem) {
+      @Mock
+      FileSystem open(String path, OpenOptions options, Handler<AsyncResult<AsyncFile>> handler) {
+        try {
+          AsyncFile asyncFile = AsyncFileUitls.createAsyncFile(vertx, path, options, context);
+          handler.handle(Future.succeededFuture(asyncFile));
+        } catch (Exception e) {
+          handler.handle(Future.failedFuture(e));
+        }
+        return fileSystem;
+      }
+    };
+  }
+
+  @Test
+  public void constructFromHttpClientResponse_noContentType(@Mocked HttpClientResponse httpClientResponse) {
+    new Expectations() {
+      {
+        httpClientResponse.getHeader(HttpHeaders.CONTENT_DISPOSITION);
+        result = "xx;filename=name.txt";
+        httpClientResponse.getHeader(HttpHeaders.CONTENT_TYPE);
+        result = null;
+      }
+    };
+
+    part = new ReadStreamPart(context, httpClientResponse);
+
+    Assert.assertEquals("name.txt", part.getSubmittedFileName());
+    Assert.assertEquals("text/plain", part.getContentType());
+  }
+
+  @Test
+  public void constructFromHttpClientResponse_hasContentType(@Mocked HttpClientResponse httpClientResponse) {
+    new Expectations() {
+      {
+        httpClientResponse.getHeader(HttpHeaders.CONTENT_DISPOSITION);
+        result = "xx;filename=name.txt";
+        httpClientResponse.getHeader(HttpHeaders.CONTENT_TYPE);
+        result = "type";
+      }
+    };
+
+    part = new ReadStreamPart(context, httpClientResponse);
+
+    Assert.assertEquals("name.txt", part.getSubmittedFileName());
+    Assert.assertEquals("type", part.getContentType());
   }
 
   @Test
@@ -128,4 +239,43 @@ public class TestReadStreamPart {
 
     part.saveToWriteStream(writeStream).get();
   }
+
+  @Test
+  public void saveAsBytes() throws InterruptedException, ExecutionException {
+    Assert.assertArrayEquals(src.getBytes(), part.saveAsBytes().get());
+  }
+
+  @Test
+  public void saveAsString() throws InterruptedException, ExecutionException {
+    Assert.assertEquals(src, part.saveAsString().get());
+  }
+
+  @Test
+  public void saveToFile() throws InterruptedException, ExecutionException, IOException {
+    File dir = new File("target/notExist-" + UUID.randomUUID().toString());
+    File file = new File(dir, "a.txt");
+
+    Assert.assertFalse(dir.exists());
+
+    part.saveToFile(file.getAbsolutePath()).get();
+
+    Assert.assertEquals(src, FileUtils.readFileToString(file));
+
+    FileUtils.forceDelete(dir);
+    Assert.assertFalse(dir.exists());
+  }
+
+  @Test
+  public void saveToFile_notExist_notCreate() throws InterruptedException, ExecutionException, IOException {
+    File dir = new File("target/notExist-" + UUID.randomUUID().toString());
+    File file = new File(dir, "a.txt");
+
+    Assert.assertFalse(dir.exists());
+
+    expectedException.expect(ExecutionException.class);
+    expectedException.expectCause(Matchers.instanceOf(FileSystemException.class));
+
+    OpenOptions openOptions = new OpenOptions().setCreateNew(false);
+    part.saveToFile(file, openOptions).get();
+  }
 }
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
index 9a586a1..db148dd 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
+++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
@@ -155,7 +155,7 @@ public class RestClientInvocation {
 
     if (HttpStatus.isSuccess(clientResponse.statusCode())
         && Part.class.equals(invocation.getOperationMeta().getMethod().getReturnType())) {
-      ReadStreamPart part = new ReadStreamPart(httpClientResponse);
+      ReadStreamPart part = new ReadStreamPart(httpClientWithContext.context(), httpClientResponse);
       invocation.getHandlerContext().put(RestConst.READ_STREAM_PART, part);
       processResponseBody(null);
       return;

-- 
To stop receiving notification emails like this one, please contact
liubao@apache.org.