You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by pi...@apache.org on 2021/12/28 06:39:05 UTC

[submarine] branch master updated: SUBMARINE-1074. S3 client for server

This is an automated email from the ASF dual-hosted git repository.

pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b426b5  SUBMARINE-1074. S3 client for server
5b426b5 is described below

commit 5b426b5d691b6ab24b743bf105a1e6ef4b7559b4
Author: jeff-901 <b0...@ntu.edu.tw>
AuthorDate: Sat Dec 25 10:25:10 2021 +0800

    SUBMARINE-1074. S3 client for server
    
    ### What is this PR for?
    Add s3 minio client in server.
    
    ### What type of PR is it?
    Feature
    
    ### Todos
    * [x] - list artifact paths by experiment id
    * [x] - upload an artifact
    * [x] - download an artifact
    * [x] - delete artifacts under a experiment id
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-1074
    
    ### How should this be tested?
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Do the license files need updating? No
    * Are there breaking changes for older versions? No
    * Does this need new documentation? No
    
    Author: jeff-901 <b0...@ntu.edu.tw>
    
    Signed-off-by: Kevin <pi...@apache.org>
    
    Closes #845 from jeff-901/SUBMARINE-1074 and squashes the following commits:
    
    51504561 [jeff-901] try with resource
    de64081d [jeff-901] create default bucket
    527073fe [jeff-901] update test minio url
    f5de64df [jeff-901] add minio pod in test
    96705061 [jeff-901] dependency and license
    1a784225 [jeff-901] add s3 client
    16ef4883 [jeff-901] add client
---
 .github/workflows/master.yml                       |   8 +
 pom.xml                                            |   3 +
 submarine-server/server-core/pom.xml               |  24 +++
 .../org/apache/submarine/server/s3/Client.java     | 172 +++++++++++++++++++++
 .../apache/submarine/server/s3/S3Constants.java    |  30 ++++
 .../org/apache/submarine/server/s3/ClientTest.java |  67 ++++++++
 6 files changed, 304 insertions(+)

diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 2d851dc..12ee564 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -351,6 +351,14 @@ jobs:
           - 3306:3306
         # wait until mysql is health
         options: --health-cmd "mysqladmin ping" --health-interval 10s --health-timeout 10s --health-retries 10
+      minio:
+        image: bitnami/minio:latest
+        env:
+          MINIO_ACCESS_KEY: "submarine_minio"
+          MINIO_SECRET_KEY: "submarine_minio"
+          MINIO_DEFAULT_BUCKETS: "submarine"
+        ports:
+          - 9000:9000
     steps:
       - uses: actions/checkout@v2
         with:
diff --git a/pom.xml b/pom.xml
index 26a4d84..176735b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,10 +76,13 @@
     <gson.version>2.8.5</gson.version>
     <jackson-databind.version>2.11.0</jackson-databind.version>
     <jackson-annotations.version>2.11.0</jackson-annotations.version>
+    <jackson-core.version>2.11.0</jackson-core.version>
     <jackson-module-jaxb-annotations.version>2.11.0</jackson-module-jaxb-annotations.version>
     <commons-configuration.version>1.10</commons-configuration.version>
     <commons-httpclient.version>3.1</commons-httpclient.version>
 
+    <minio.version>7.1.4</minio.version>
+
     <cglib.version>3.3.0</cglib.version>
     <hibernate.version>5.6.0.Final</hibernate.version>
     <jboss.logging.version>3.4.2.Final</jboss.logging.version>
diff --git a/submarine-server/server-core/pom.xml b/submarine-server/server-core/pom.xml
index 53a4183..9dd3d16 100644
--- a/submarine-server/server-core/pom.xml
+++ b/submarine-server/server-core/pom.xml
@@ -315,6 +315,11 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>${jackson-core.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>com.google.code.gson</groupId>
@@ -418,6 +423,25 @@
       <version>1.18.0</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.minio</groupId>
+      <artifactId>minio</artifactId>
+      <version>${minio.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/s3/Client.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/s3/Client.java
new file mode 100644
index 0000000..02e4cb6
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/s3/Client.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.s3;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import javax.ws.rs.core.Response;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.PutObjectArgs;
+import io.minio.RemoveObjectsArgs;
+import io.minio.Result;
+import io.minio.messages.DeleteError;
+import io.minio.messages.DeleteObject;
+import io.minio.messages.Item;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+
+
+public class Client {
+  public MinioClient minioClient;
+
+  public Client() {
+    minioClient = MinioClient.builder()
+        .endpoint(S3Constants.ENDPOINT)
+        .credentials(S3Constants.ACCESSKEY, S3Constants.SECRETKEY)
+        .build();
+  }
+
+  public Client(String endpoint) {
+    minioClient = MinioClient.builder()
+        .endpoint(endpoint)
+        .credentials(S3Constants.ACCESSKEY, S3Constants.SECRETKEY)
+        .build();
+  }
+
+  /**
+   * Get a list of artifact path under the experiment
+   *
+   * @param experimentId experiment id
+   * @return a list of artifact path
+   */
+  public List<String> listArtifactByExperimentId(String experimentId) throws SubmarineRuntimeException {
+    Iterable<Result<Item>> artifactNames = minioClient.listObjects(ListObjectsArgs.builder()
+        .bucket(S3Constants.BUCKET).prefix(experimentId + "/").delimiter("/").build());
+    List<String> response = new ArrayList<>();
+    Iterable<Result<Item>> artifacts;
+    for (Result<Item> artifactName : artifactNames) {
+      try {
+        artifacts = minioClient.listObjects(ListObjectsArgs.builder().bucket(S3Constants.BUCKET)
+            .prefix(artifactName.get().objectName()).delimiter("/").build());
+        for (Result<Item> artifact: artifacts) {
+          response.add("s3://" + S3Constants.BUCKET + "/" + artifact.get().objectName());
+        }
+      } catch (Exception e) {
+        throw new SubmarineRuntimeException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+          e.getMessage());
+      }
+    }
+    return response;
+  }
+
+  /**
+   * Delete all the artifacts under given experiment name
+   *
+   * @param experimentId experiment id
+   */
+  public void deleteArtifactsByExperiment(String experimentId) {
+    deleteAllArtifactsByFolder(experimentId);
+  }
+
+  /**
+   * Delete all the artifacts under s3://submarine
+   */
+  public void deleteAllArtifacts() {
+    deleteAllArtifactsByFolder("");
+  }
+
+  /**
+   * Download an artifact
+   *
+   * @param path artifact path
+   * @return an array of byte
+   */
+  public byte[] downloadArtifact(String path) {
+    byte[] buffer;
+    int b;
+    try (InputStream is = minioClient.getObject(
+      GetObjectArgs.builder()
+      .bucket(S3Constants.BUCKET)
+      .object(path)
+      .build())) {
+      b = is.read();
+      if (b == -1) {
+        return new byte[0];
+      }
+      buffer = new byte[1 + is.available()];
+      buffer[0] = (byte) b;
+      int i = 1;
+      while ((b = is.read()) != -1){
+        buffer[i] = (byte) b;
+        i += 1;
+      }
+    } catch (Exception e) {
+      throw new SubmarineRuntimeException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+          e.getMessage());
+    }
+    return buffer;
+  }
+
+  /**
+   * Upload an artifact
+   *
+   * @param path path of the file
+   * @param content content of the given file
+   */
+  public void logArtifact(String path, byte[] content) throws SubmarineRuntimeException {
+    InputStream targetStream = new ByteArrayInputStream(content);
+    try {
+      minioClient.putObject(PutObjectArgs.builder().bucket(S3Constants.BUCKET).
+          object(path).stream(targetStream,
+          content.length, -1).build());
+    } catch (Exception e) {
+      throw new SubmarineRuntimeException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+          e.getMessage());
+    }
+  }
+
+  /**
+   * Delete all elements under the given folder path
+   */
+  private void deleteAllArtifactsByFolder(String folder) {
+    Iterable<Result<Item>> artifactNames = minioClient.listObjects(ListObjectsArgs.builder()
+        .bucket(S3Constants.BUCKET).prefix(folder + "/").recursive(true).build());
+    List<DeleteObject> objects = new LinkedList<>();
+    for (Result<Item> artifactName: artifactNames){
+      try {
+        objects.add(new DeleteObject(artifactName.get().objectName()));
+        Iterable<Result<DeleteError>> results = minioClient.removeObjects(
+            RemoveObjectsArgs.builder().bucket(S3Constants.BUCKET).objects(objects).build());
+        for (Result<DeleteError> result : results) {
+          DeleteError error = result.get();
+          throw new SubmarineRuntimeException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+              "Error in deleting object " + error.objectName() + "; " + error.message());
+        }
+      } catch (Exception e) {
+        throw new SubmarineRuntimeException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+            e.getMessage());
+      }
+    }
+  }
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/s3/S3Constants.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/s3/S3Constants.java
new file mode 100644
index 0000000..c92cacf
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/s3/S3Constants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.s3;
+
+public class S3Constants {
+  public static final String ENDPOINT = "http://submarine-minio-service:9000";
+
+  public static final String ACCESSKEY = "submarine_minio";
+
+  public static final String SECRETKEY = "submarine_minio";
+
+  public static final String BUCKET = "submarine";
+}
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/s3/ClientTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/s3/ClientTest.java
new file mode 100644
index 0000000..ff6ed97
--- /dev/null
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/s3/ClientTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.s3;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import java.util.List;
+
+
+public class ClientTest {
+  private Client client = new Client("http://localhost:9000");
+  private final String testExperimentId = "experiment-sample";
+  private final String bucket = "s3://submarine";
+
+  @After
+  public void cleanAll() {
+    client.deleteAllArtifacts();
+  }
+
+  @Test
+  public void testLogArtifactAndDownloadArtifact() {
+    String path = "sample_folder/sample_file";
+    byte[] content = "0123456789".getBytes();;
+    client.logArtifact(path, content);
+    byte[] response = client.downloadArtifact(path);
+    Assert.assertArrayEquals(content, response);
+  }
+
+  @Test
+  public void testListArtifactByExperimentIdAndDeleteArtifactByExperiment() {
+    String testModelName  = "sample";
+    byte[] content = "0123456789".getBytes();
+
+    String[] artifactPaths = {
+        testExperimentId + "/" + testModelName + "/1",
+        testExperimentId + "/" + testModelName + "/2"};
+    String[] actualResults = {
+        bucket + "/" + testExperimentId + "/" + testModelName + "/1",
+        bucket + "/" + testExperimentId + "/" + testModelName + "/2"};
+    client.logArtifact(artifactPaths[0], content);
+    client.logArtifact(artifactPaths[1], content);
+    List<String> results = client.listArtifactByExperimentId(testExperimentId);
+    Assert.assertArrayEquals(actualResults, results.toArray());
+
+    client.deleteArtifactsByExperiment(testExperimentId);
+    results = client.listArtifactByExperimentId(testExperimentId);
+    Assert.assertArrayEquals(new String[0], results.toArray());
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org