You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by pi...@apache.org on 2021/12/28 06:39:05 UTC
[submarine] branch master updated: SUBMARINE-1074. S3 client for server
This is an automated email from the ASF dual-hosted git repository.
pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 5b426b5 SUBMARINE-1074. S3 client for server
5b426b5 is described below
commit 5b426b5d691b6ab24b743bf105a1e6ef4b7559b4
Author: jeff-901 <b0...@ntu.edu.tw>
AuthorDate: Sat Dec 25 10:25:10 2021 +0800
SUBMARINE-1074. S3 client for server
### What is this PR for?
Add s3 minio client in server.
### What type of PR is it?
Feature
### Todos
* [x] - list artifact paths by experiment id
* [x] - upload an artifact
* [x] - download an artifact
* [x] - delete artifacts under a experiment id
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-1074
### How should this be tested?
### Screenshots (if appropriate)
### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? No
* Does this need new documentation? No
Author: jeff-901 <b0...@ntu.edu.tw>
Signed-off-by: Kevin <pi...@apache.org>
Closes #845 from jeff-901/SUBMARINE-1074 and squashes the following commits:
51504561 [jeff-901] try with resource
de64081d [jeff-901] create default bucket
527073fe [jeff-901] update test minio url
f5de64df [jeff-901] add minio pod in test
96705061 [jeff-901] dependency and license
1a784225 [jeff-901] add s3 client
16ef4883 [jeff-901] add client
---
.github/workflows/master.yml | 8 +
pom.xml | 3 +
submarine-server/server-core/pom.xml | 24 +++
.../org/apache/submarine/server/s3/Client.java | 172 +++++++++++++++++++++
.../apache/submarine/server/s3/S3Constants.java | 30 ++++
.../org/apache/submarine/server/s3/ClientTest.java | 67 ++++++++
6 files changed, 304 insertions(+)
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 2d851dc..12ee564 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -351,6 +351,14 @@ jobs:
- 3306:3306
# wait until mysql is health
options: --health-cmd "mysqladmin ping" --health-interval 10s --health-timeout 10s --health-retries 10
+ minio:
+ image: bitnami/minio:latest
+ env:
+ MINIO_ACCESS_KEY: "submarine_minio"
+ MINIO_SECRET_KEY: "submarine_minio"
+ MINIO_DEFAULT_BUCKETS: "submarine"
+ ports:
+ - 9000:9000
steps:
- uses: actions/checkout@v2
with:
diff --git a/pom.xml b/pom.xml
index 26a4d84..176735b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,10 +76,13 @@
<gson.version>2.8.5</gson.version>
<jackson-databind.version>2.11.0</jackson-databind.version>
<jackson-annotations.version>2.11.0</jackson-annotations.version>
+ <jackson-core.version>2.11.0</jackson-core.version>
<jackson-module-jaxb-annotations.version>2.11.0</jackson-module-jaxb-annotations.version>
<commons-configuration.version>1.10</commons-configuration.version>
<commons-httpclient.version>3.1</commons-httpclient.version>
+ <minio.version>7.1.4</minio.version>
+
<cglib.version>3.3.0</cglib.version>
<hibernate.version>5.6.0.Final</hibernate.version>
<jboss.logging.version>3.4.2.Final</jboss.logging.version>
diff --git a/submarine-server/server-core/pom.xml b/submarine-server/server-core/pom.xml
index 53a4183..9dd3d16 100644
--- a/submarine-server/server-core/pom.xml
+++ b/submarine-server/server-core/pom.xml
@@ -315,6 +315,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson-core.version}</version>
+ </dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
@@ -418,6 +423,25 @@
<version>1.18.0</version>
</dependency>
+ <dependency>
+ <groupId>io.minio</groupId>
+ <artifactId>minio</artifactId>
+ <version>${minio.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/s3/Client.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/s3/Client.java
new file mode 100644
index 0000000..02e4cb6
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/s3/Client.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.s3;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import javax.ws.rs.core.Response;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.PutObjectArgs;
+import io.minio.RemoveObjectsArgs;
+import io.minio.Result;
+import io.minio.messages.DeleteError;
+import io.minio.messages.DeleteObject;
+import io.minio.messages.Item;
+import org.apache.submarine.commons.utils.exception.SubmarineRuntimeException;
+
+
+public class Client {
+ public MinioClient minioClient;
+
+ public Client() {
+ minioClient = MinioClient.builder()
+ .endpoint(S3Constants.ENDPOINT)
+ .credentials(S3Constants.ACCESSKEY, S3Constants.SECRETKEY)
+ .build();
+ }
+
+ public Client(String endpoint) {
+ minioClient = MinioClient.builder()
+ .endpoint(endpoint)
+ .credentials(S3Constants.ACCESSKEY, S3Constants.SECRETKEY)
+ .build();
+ }
+
+ /**
+ * Get a list of artifact path under the experiment
+ *
+ * @param experimentId experiment id
+ * @return a list of artifact path
+ */
+ public List<String> listArtifactByExperimentId(String experimentId) throws SubmarineRuntimeException {
+ Iterable<Result<Item>> artifactNames = minioClient.listObjects(ListObjectsArgs.builder()
+ .bucket(S3Constants.BUCKET).prefix(experimentId + "/").delimiter("/").build());
+ List<String> response = new ArrayList<>();
+ Iterable<Result<Item>> artifacts;
+ for (Result<Item> artifactName : artifactNames) {
+ try {
+ artifacts = minioClient.listObjects(ListObjectsArgs.builder().bucket(S3Constants.BUCKET)
+ .prefix(artifactName.get().objectName()).delimiter("/").build());
+ for (Result<Item> artifact: artifacts) {
+ response.add("s3://" + S3Constants.BUCKET + "/" + artifact.get().objectName());
+ }
+ } catch (Exception e) {
+ throw new SubmarineRuntimeException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+ e.getMessage());
+ }
+ }
+ return response;
+ }
+
+ /**
+ * Delete all the artifacts under given experiment name
+ *
+ * @param experimentId experiment id
+ */
+ public void deleteArtifactsByExperiment(String experimentId) {
+ deleteAllArtifactsByFolder(experimentId);
+ }
+
+ /**
+ * Delete all the artifacts under s3://submarine
+ */
+ public void deleteAllArtifacts() {
+ deleteAllArtifactsByFolder("");
+ }
+
+ /**
+ * Download an artifact
+ *
+ * @param path artifact path
+ * @return an array of byte
+ */
+ public byte[] downloadArtifact(String path) {
+ byte[] buffer;
+ int b;
+ try (InputStream is = minioClient.getObject(
+ GetObjectArgs.builder()
+ .bucket(S3Constants.BUCKET)
+ .object(path)
+ .build())) {
+ b = is.read();
+ if (b == -1) {
+ return new byte[0];
+ }
+ buffer = new byte[1 + is.available()];
+ buffer[0] = (byte) b;
+ int i = 1;
+ while ((b = is.read()) != -1){
+ buffer[i] = (byte) b;
+ i += 1;
+ }
+ } catch (Exception e) {
+ throw new SubmarineRuntimeException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+ e.getMessage());
+ }
+ return buffer;
+ }
+
+ /**
+ * Upload an artifact
+ *
+ * @param path path of the file
+ * @param content content of the given file
+ */
+ public void logArtifact(String path, byte[] content) throws SubmarineRuntimeException {
+ InputStream targetStream = new ByteArrayInputStream(content);
+ try {
+ minioClient.putObject(PutObjectArgs.builder().bucket(S3Constants.BUCKET).
+ object(path).stream(targetStream,
+ content.length, -1).build());
+ } catch (Exception e) {
+ throw new SubmarineRuntimeException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+ e.getMessage());
+ }
+ }
+
+ /**
+ * Delete all elements under the given folder path
+ */
+ private void deleteAllArtifactsByFolder(String folder) {
+ Iterable<Result<Item>> artifactNames = minioClient.listObjects(ListObjectsArgs.builder()
+ .bucket(S3Constants.BUCKET).prefix(folder + "/").recursive(true).build());
+ List<DeleteObject> objects = new LinkedList<>();
+ for (Result<Item> artifactName: artifactNames){
+ try {
+ objects.add(new DeleteObject(artifactName.get().objectName()));
+ Iterable<Result<DeleteError>> results = minioClient.removeObjects(
+ RemoveObjectsArgs.builder().bucket(S3Constants.BUCKET).objects(objects).build());
+ for (Result<DeleteError> result : results) {
+ DeleteError error = result.get();
+ throw new SubmarineRuntimeException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+ "Error in deleting object " + error.objectName() + "; " + error.message());
+ }
+ } catch (Exception e) {
+ throw new SubmarineRuntimeException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+ e.getMessage());
+ }
+ }
+ }
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/s3/S3Constants.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/s3/S3Constants.java
new file mode 100644
index 0000000..c92cacf
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/s3/S3Constants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.s3;
+
+public class S3Constants {
+ public static final String ENDPOINT = "http://submarine-minio-service:9000";
+
+ public static final String ACCESSKEY = "submarine_minio";
+
+ public static final String SECRETKEY = "submarine_minio";
+
+ public static final String BUCKET = "submarine";
+}
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/s3/ClientTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/s3/ClientTest.java
new file mode 100644
index 0000000..ff6ed97
--- /dev/null
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/s3/ClientTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.server.s3;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import java.util.List;
+
+
+public class ClientTest {
+ private Client client = new Client("http://localhost:9000");
+ private final String testExperimentId = "experiment-sample";
+ private final String bucket = "s3://submarine";
+
+ @After
+ public void cleanAll() {
+ client.deleteAllArtifacts();
+ }
+
+ @Test
+ public void testLogArtifactAndDownloadArtifact() {
+ String path = "sample_folder/sample_file";
+ byte[] content = "0123456789".getBytes();;
+ client.logArtifact(path, content);
+ byte[] response = client.downloadArtifact(path);
+ Assert.assertArrayEquals(content, response);
+ }
+
+ @Test
+ public void testListArtifactByExperimentIdAndDeleteArtifactByExperiment() {
+ String testModelName = "sample";
+ byte[] content = "0123456789".getBytes();
+
+ String[] artifactPaths = {
+ testExperimentId + "/" + testModelName + "/1",
+ testExperimentId + "/" + testModelName + "/2"};
+ String[] actualResults = {
+ bucket + "/" + testExperimentId + "/" + testModelName + "/1",
+ bucket + "/" + testExperimentId + "/" + testModelName + "/2"};
+ client.logArtifact(artifactPaths[0], content);
+ client.logArtifact(artifactPaths[1], content);
+ List<String> results = client.listArtifactByExperimentId(testExperimentId);
+ Assert.assertArrayEquals(actualResults, results.toArray());
+
+ client.deleteArtifactsByExperiment(testExperimentId);
+ results = client.listArtifactByExperimentId(testExperimentId);
+ Assert.assertArrayEquals(new String[0], results.toArray());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org