You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/13 01:07:11 UTC
[incubator-streampark] branch dev updated: [Feature] implement Oss Storage Service (#1579)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new fb09de044 [Feature] implement Oss Storage Service (#1579)
fb09de044 is described below
commit fb09de044ecc88a3feecf50ee42c7d3b226a8f46
Author: legendtkl <ta...@bytedance.com>
AuthorDate: Tue Sep 13 09:07:03 2022 +0800
[Feature] implement Oss Storage Service (#1579)
* [Feature] implement Oss Storage Service
---
pom.xml | 16 +++-
streampark-common/pom.xml | 6 +-
streampark-flink/pom.xml | 6 +-
streampark-storage/pom.xml | 26 +++++++
.../apache/streampark/storage/StorageService.java | 7 +-
...orageService.java => StorageServiceConfig.java} | 5 +-
.../{StorageService.java => oss/OssConfig.java} | 15 +++-
.../streampark/storage/oss/OssStorageService.java | 88 ++++++++++++++++++++++
.../storage/oss/OssStorageServiceTest.java | 37 +++++++++
9 files changed, 189 insertions(+), 17 deletions(-)
diff --git a/pom.xml b/pom.xml
index 524039883..826a2545c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,7 +99,7 @@
<hive.version>2.3.4</hive.version>
<hadoop.version>2.10.2</hadoop.version>
<hbase.version>2.1.10</hbase.version>
- <junit.version>5.6.3</junit.version>
+ <jupiter.version>5.6.3</jupiter.version>
<zkclient.version>0.11</zkclient.version>
<curator.version>4.2.0</curator.version>
<redis.version>3.3.0</redis.version>
@@ -128,6 +128,8 @@
<streampark.shaded.package>org.apache.streampark.shaded</streampark.shaded.package>
<flink.table.uber.artifact.id>flink-table-uber_${scala.binary.version}</flink.table.uber.artifact.id>
<httpclient.version>4.5.13</httpclient.version>
+ <lombok.version>1.18.24</lombok.version>
+ <junit.version>4.13.2</junit.version>
<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>
@@ -535,6 +537,18 @@
</dependency>
<!--log4j end-->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>${lombok.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>
diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml
index 03db90f21..3e285c1bd 100644
--- a/streampark-common/pom.xml
+++ b/streampark-common/pom.xml
@@ -35,21 +35,21 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
- <version>${junit.version}</version>
+ <version>${jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
- <version>${junit.version}</version>
+ <version>${jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
- <version>${junit.version}</version>
+ <version>${jupiter.version}</version>
<scope>test</scope>
</dependency>
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index 8c8eb7512..0dbec5711 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -55,21 +55,21 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
- <version>${junit.version}</version>
+ <version>${jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
- <version>${junit.version}</version>
+ <version>${jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
- <version>${junit.version}</version>
+ <version>${jupiter.version}</version>
<scope>test</scope>
</dependency>
diff --git a/streampark-storage/pom.xml b/streampark-storage/pom.xml
index 45089b740..bbccf6fd6 100644
--- a/streampark-storage/pom.xml
+++ b/streampark-storage/pom.xml
@@ -33,6 +33,32 @@
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <aliyun.oss.version>3.15.0</aliyun.oss.version>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.aliyun.oss</groupId>
+ <artifactId>aliyun-sdk-oss</artifactId>
+ <version>${aliyun.oss.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
</project>
diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
index ed19d1e59..bd37cbff3 100644
--- a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
+++ b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
@@ -17,8 +17,11 @@
package org.apache.streampark.storage;
+/**
+ * StorageService will be used as artifacts fetcher in pod template, so it should rely on other modules.
+ */
public interface StorageService {
- byte[] getData(String objectPath);
+ void getData(String objectPath, String localFilePath) throws Exception;
- Boolean putData(String objectPath, byte[] data);
+ void putData(String objectPath, String localFilePath) throws Exception;
}
diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java
similarity index 86%
copy from streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
copy to streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java
index ed19d1e59..0b348baca 100644
--- a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
+++ b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java
@@ -17,8 +17,5 @@
package org.apache.streampark.storage;
-public interface StorageService {
- byte[] getData(String objectPath);
-
- Boolean putData(String objectPath, byte[] data);
+public abstract class StorageServiceConfig {
}
diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java
similarity index 70%
copy from streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
copy to streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java
index ed19d1e59..6d922014a 100644
--- a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
+++ b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java
@@ -15,10 +15,17 @@
* limitations under the License.
*/
-package org.apache.streampark.storage;
+package org.apache.streampark.storage.oss;
-public interface StorageService {
- byte[] getData(String objectPath);
+import org.apache.streampark.storage.StorageServiceConfig;
- Boolean putData(String objectPath, byte[] data);
+import lombok.Data;
+
+@Data
+public class OssConfig extends StorageServiceConfig {
+ private String accessKeyId;
+ private String accessKeySecret;
+ private String endpoint;
+ private String bucket;
+ private String baseUri;
}
diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java
new file mode 100644
index 000000000..c7efd1309
--- /dev/null
+++ b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.storage.oss;
+
+import org.apache.streampark.storage.StorageService;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.model.GetObjectRequest;
+import com.aliyun.oss.model.PutObjectRequest;
+import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+
+@Slf4j
+public class OssStorageService implements StorageService {
+
+ final OssConfig ossConfig;
+ final OSS ossClient;
+
+ public OssStorageService(OssConfig config) {
+ this.ossConfig = config;
+ this.ossClient = new OSSClientBuilder()
+ .build(ossConfig.getEndpoint(), ossConfig.getAccessKeyId(), ossConfig.getAccessKeySecret());
+ }
+
+ @Override
+ public void getData(String objectPath, String localFilePath) throws Exception {
+ String bucket = ossConfig.getBucket();
+
+ if (!ossClient.doesObjectExist(bucket, objectPath)) {
+ throw new RuntimeException(String.format("File '%s' not exist", objectPath));
+ }
+
+ try {
+ ossClient.getObject(new GetObjectRequest(bucket, objectPath), new File(localFilePath));
+ } catch (Exception e) {
+ log.error("GetData failed. ObjectPath: %s, local path: %s.", objectPath, localFilePath, e);
+ throw handleOssException(e);
+ }
+ }
+
+ @Override
+ public void putData(String objectPath, String localFilePath) throws Exception {
+ try {
+ PutObjectRequest putObjectRequest = new PutObjectRequest(ossConfig.getBucket(), objectPath, new File(localFilePath));
+ ossClient.putObject(putObjectRequest);
+ } catch (Exception e) {
+ log.error("PutData failed. ObjectPath: %s, local path: %s.", objectPath, localFilePath, e);
+ throw handleOssException(e);
+ }
+ }
+
+ @VisibleForTesting
+ static RuntimeException handleOssException(Exception e) {
+ if (e instanceof OSSException) {
+ OSSException oe = (OSSException) e;
+ String errMsg = String.format("Caught an OSSException. Error Message: %s." +
+ " Error Code: %s. Request ID: %s", oe.getErrorMessage(), oe.getErrorCode(),
+ oe.getRequestId());
+ return new RuntimeException(errMsg, oe);
+ } else if (e instanceof ClientException) {
+ ClientException ce = (ClientException) e;
+ String errMsg = String.format("Caught an ClientException. Error Message: %s.", ce.getMessage());
+ return new RuntimeException(errMsg, ce);
+ } else {
+ return new RuntimeException(e);
+ }
+ }
+}
diff --git a/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java b/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java
new file mode 100644
index 000000000..35f7683f4
--- /dev/null
+++ b/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.storage.oss;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSSException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OssStorageServiceTest {
+
+ @Test
+ public void testHandleException() throws Exception {
+ OSSException ossException = new OSSException("mock error", "MOCK_CODE", "requestId", "hostId", "header", "resource", "GET");
+ RuntimeException exp = OssStorageService.handleOssException(ossException);
+ Assert.assertTrue(exp.getMessage().equals("Caught an OSSException. Error Message: mock error. Error Code: MOCK_CODE. Request ID: requestId"));
+
+ ClientException ossClientException = new ClientException("Client ERROR");
+ exp = OssStorageService.handleOssException(ossClientException);
+ Assert.assertTrue(exp.getMessage().startsWith("Caught an ClientException. Error Message: Client ERROR"));
+ }
+}