You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/13 01:07:11 UTC

[incubator-streampark] branch dev updated: [Feature] implement Oss Storage Service (#1579)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new fb09de044 [Feature] implement Oss Storage Service (#1579)
fb09de044 is described below

commit fb09de044ecc88a3feecf50ee42c7d3b226a8f46
Author: legendtkl <ta...@bytedance.com>
AuthorDate: Tue Sep 13 09:07:03 2022 +0800

    [Feature] implement Oss Storage Service (#1579)
    
    * [Feature] implement Oss Storage Service
---
 pom.xml                                            | 16 +++-
 streampark-common/pom.xml                          |  6 +-
 streampark-flink/pom.xml                           |  6 +-
 streampark-storage/pom.xml                         | 26 +++++++
 .../apache/streampark/storage/StorageService.java  |  7 +-
 ...orageService.java => StorageServiceConfig.java} |  5 +-
 .../{StorageService.java => oss/OssConfig.java}    | 15 +++-
 .../streampark/storage/oss/OssStorageService.java  | 88 ++++++++++++++++++++++
 .../storage/oss/OssStorageServiceTest.java         | 37 +++++++++
 9 files changed, 189 insertions(+), 17 deletions(-)

diff --git a/pom.xml b/pom.xml
index 524039883..826a2545c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,7 +99,7 @@
         <hive.version>2.3.4</hive.version>
         <hadoop.version>2.10.2</hadoop.version>
         <hbase.version>2.1.10</hbase.version>
-        <junit.version>5.6.3</junit.version>
+        <jupiter.version>5.6.3</jupiter.version>
         <zkclient.version>0.11</zkclient.version>
         <curator.version>4.2.0</curator.version>
         <redis.version>3.3.0</redis.version>
@@ -128,6 +128,8 @@
         <streampark.shaded.package>org.apache.streampark.shaded</streampark.shaded.package>
         <flink.table.uber.artifact.id>flink-table-uber_${scala.binary.version}</flink.table.uber.artifact.id>
         <httpclient.version>4.5.13</httpclient.version>
+        <lombok.version>1.18.24</lombok.version>
+        <junit.version>4.13.2</junit.version>
 
         <PermGen>64m</PermGen>
         <MaxPermGen>512m</MaxPermGen>
@@ -535,6 +537,18 @@
             </dependency>
             <!--log4j end-->
 
+            <dependency>
+                <groupId>org.projectlombok</groupId>
+                <artifactId>lombok</artifactId>
+                <version>${lombok.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>junit</groupId>
+                <artifactId>junit</artifactId>
+                <version>${junit.version}</version>
+            </dependency>
+
         </dependencies>
 
     </dependencyManagement>
diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml
index 03db90f21..3e285c1bd 100644
--- a/streampark-common/pom.xml
+++ b/streampark-common/pom.xml
@@ -35,21 +35,21 @@
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-engine</artifactId>
-            <version>${junit.version}</version>
+            <version>${jupiter.version}</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-params</artifactId>
-            <version>${junit.version}</version>
+            <version>${jupiter.version}</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-api</artifactId>
-            <version>${junit.version}</version>
+            <version>${jupiter.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index 8c8eb7512..0dbec5711 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -55,21 +55,21 @@
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-engine</artifactId>
-            <version>${junit.version}</version>
+            <version>${jupiter.version}</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-params</artifactId>
-            <version>${junit.version}</version>
+            <version>${jupiter.version}</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-api</artifactId>
-            <version>${junit.version}</version>
+            <version>${jupiter.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/streampark-storage/pom.xml b/streampark-storage/pom.xml
index 45089b740..bbccf6fd6 100644
--- a/streampark-storage/pom.xml
+++ b/streampark-storage/pom.xml
@@ -33,6 +33,32 @@
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <aliyun.oss.version>3.15.0</aliyun.oss.version>
     </properties>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.aliyun.oss</groupId>
+            <artifactId>aliyun-sdk-oss</artifactId>
+            <version>${aliyun.oss.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
 </project>
diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
index ed19d1e59..bd37cbff3 100644
--- a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
+++ b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
@@ -17,8 +17,11 @@
 
 package org.apache.streampark.storage;
 
+/**
+ * StorageService will be used as artifacts fetcher in pod template, so it should rely on other modules.
+ */
 public interface StorageService {
-    byte[] getData(String objectPath);
+    void getData(String objectPath, String localFilePath) throws Exception;
 
-    Boolean putData(String objectPath, byte[] data);
+    void putData(String objectPath, String localFilePath) throws Exception;
 }
diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java
similarity index 86%
copy from streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
copy to streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java
index ed19d1e59..0b348baca 100644
--- a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
+++ b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java
@@ -17,8 +17,5 @@
 
 package org.apache.streampark.storage;
 
-public interface StorageService {
-    byte[] getData(String objectPath);
-
-    Boolean putData(String objectPath, byte[] data);
+public abstract class StorageServiceConfig {
 }
diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java
similarity index 70%
copy from streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
copy to streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java
index ed19d1e59..6d922014a 100644
--- a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
+++ b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java
@@ -15,10 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.storage;
+package org.apache.streampark.storage.oss;
 
-public interface StorageService {
-    byte[] getData(String objectPath);
+import org.apache.streampark.storage.StorageServiceConfig;
 
-    Boolean putData(String objectPath, byte[] data);
+import lombok.Data;
+
+@Data
+public class OssConfig extends StorageServiceConfig {
+    private String accessKeyId;
+    private String accessKeySecret;
+    private String endpoint;
+    private String bucket;
+    private String baseUri;
 }
diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java
new file mode 100644
index 000000000..c7efd1309
--- /dev/null
+++ b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.storage.oss;
+
+import org.apache.streampark.storage.StorageService;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.model.GetObjectRequest;
+import com.aliyun.oss.model.PutObjectRequest;
+import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+
+@Slf4j
+public class OssStorageService implements StorageService {
+
+    final OssConfig ossConfig;
+    final OSS ossClient;
+
+    public OssStorageService(OssConfig config) {
+        this.ossConfig = config;
+        this.ossClient = new OSSClientBuilder()
+            .build(ossConfig.getEndpoint(), ossConfig.getAccessKeyId(), ossConfig.getAccessKeySecret());
+    }
+
+    @Override
+    public void getData(String objectPath, String localFilePath) throws Exception {
+        String bucket = ossConfig.getBucket();
+
+        if (!ossClient.doesObjectExist(bucket, objectPath)) {
+            throw new RuntimeException(String.format("File '%s' not exist", objectPath));
+        }
+
+        try {
+            ossClient.getObject(new GetObjectRequest(bucket, objectPath), new File(localFilePath));
+        } catch (Exception e) {
+            log.error("GetData failed. ObjectPath: %s, local path: %s.", objectPath, localFilePath, e);
+            throw handleOssException(e);
+        }
+    }
+
+    @Override
+    public void putData(String objectPath, String localFilePath) throws Exception {
+        try {
+            PutObjectRequest putObjectRequest = new PutObjectRequest(ossConfig.getBucket(), objectPath, new File(localFilePath));
+            ossClient.putObject(putObjectRequest);
+        } catch (Exception e) {
+            log.error("PutData failed. ObjectPath: %s, local path: %s.", objectPath, localFilePath, e);
+            throw handleOssException(e);
+        }
+    }
+
+    @VisibleForTesting
+    static RuntimeException handleOssException(Exception e) {
+        if (e instanceof OSSException) {
+            OSSException oe = (OSSException) e;
+            String errMsg = String.format("Caught an OSSException. Error Message: %s." +
+                    " Error Code: %s. Request ID: %s", oe.getErrorMessage(), oe.getErrorCode(),
+                oe.getRequestId());
+            return new RuntimeException(errMsg, oe);
+        } else if (e instanceof ClientException) {
+            ClientException ce = (ClientException) e;
+            String errMsg = String.format("Caught an ClientException. Error Message: %s.", ce.getMessage());
+            return new RuntimeException(errMsg, ce);
+        } else {
+            return new RuntimeException(e);
+        }
+    }
+}
diff --git a/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java b/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java
new file mode 100644
index 000000000..35f7683f4
--- /dev/null
+++ b/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.storage.oss;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSSException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OssStorageServiceTest {
+
+    @Test
+    public void testHandleException() throws Exception {
+        OSSException ossException = new OSSException("mock error", "MOCK_CODE", "requestId", "hostId", "header", "resource", "GET");
+        RuntimeException exp = OssStorageService.handleOssException(ossException);
+        Assert.assertTrue(exp.getMessage().equals("Caught an OSSException. Error Message: mock error. Error Code: MOCK_CODE. Request ID: requestId"));
+
+        ClientException ossClientException = new ClientException("Client ERROR");
+        exp = OssStorageService.handleOssException(ossClientException);
+        Assert.assertTrue(exp.getMessage().startsWith("Caught an ClientException. Error Message: Client ERROR"));
+    }
+}