You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/01/17 08:07:53 UTC

[incubator-seatunnel] branch dev updated: [Feature][Zeta][checkpoint-storage] add oss support for checkpoint storing. (#3732)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a5b163bbb [Feature][Zeta][checkpoint-storage] add oss support for checkpoint storing. (#3732)
a5b163bbb is described below

commit a5b163bbb2b8cc6e09d83e8d17f74453f83768a9
Author: PengYuan <yu...@gmail.com>
AuthorDate: Tue Jan 17 16:07:48 2023 +0800

    [Feature][Zeta][checkpoint-storage] add oss support for checkpoint storing. (#3732)
    
    * add oss support for checkpoint storing.
    
    * fix checkstyle for checkpoint oss support.
    
    * modify oss link order.
    
    * add doc for oss checkpoint storage settings.
    
    * add comment for oss configuration.
    
    * manage hadoop3 version by properties;add oss schema for hdfs
    
    * fix test disable info describe error.
    
    * remove no use hadoop 3 version from parent pom
    
    * remove hadoop-aliyun dependency from checkpoint-storage-hdfs pom
    
    * fix checkpoint-storage.md error and increase java mem
    
    * increase unit-test java mem
    
    * [Feature][Zeta][Checkpoint-Storage] Fix pom
    
    Co-authored-by: 鹏赋 <pe...@alibaba-inc.com>
    Co-authored-by: tyrantlucifer <ty...@gmail.com>
---
 .github/workflows/backend.yml                      | 12 +++---
 docs/en/seatunnel-engine/checkpoint-storage.md     | 36 ++++++++++++++--
 pom.xml                                            |  1 -
 .../checkpoint-storage-hdfs/pom.xml                | 11 +++++
 .../storage/hdfs/HdfsStorageFactory.java           | 12 +++++-
 .../storage/hdfs/common/FileConfiguration.java     |  7 ++--
 .../storage/hdfs/common/OssConfiguration.java      | 49 ++++++++++++++++++++++
 .../storage/hdfs/OssFileCheckpointTest.java        | 45 ++++++++++++++++++++
 8 files changed, 159 insertions(+), 14 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index d1025b0f4..12472f681 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -304,14 +304,14 @@ jobs:
         run: |
           ./mvnw -B -T 1C clean verify -D"maven.test.skip"=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates
         env:
-          MAVEN_OPTS: -Xmx2048m
+          MAVEN_OPTS: -Xmx4096m
 
       - name: run updated modules unit test
         if: needs.changes.outputs.api == 'false' && needs.changes.outputs.ut-modules != ''
         run: |
           ./mvnw -B -T 1C clean verify -D"maven.test.skip"=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl ${{needs.changes.outputs.ut-modules}} -am -Pci
         env:
-          MAVEN_OPTS: -Xmx2048m
+          MAVEN_OPTS: -Xmx4096m
 
   updated-modules-integration-test:
     needs: [ changes, sanity-check ]
@@ -359,7 +359,7 @@ jobs:
         run: |
           ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :seatunnel-transforms-v2-e2e,:connector-seatunnel-e2e-base -am -Pci
         env:
-          MAVEN_OPTS: -Xmx2048m
+          MAVEN_OPTS: -Xmx4096m
 
   all-connectors-it-1:
     needs: [ changes, sanity-check ]
@@ -388,7 +388,7 @@ jobs:
           run_it_modules=`python tools/update_modules_check/update_modules_check.py sub_it_module $sub_modules 3 0`
           ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $run_it_modules -am -Pci
         env:
-          MAVEN_OPTS: -Xmx2048m
+          MAVEN_OPTS: -Xmx4096m
 
   all-connectors-it-2:
     needs: [ changes, sanity-check ]
@@ -417,7 +417,7 @@ jobs:
           run_it_modules=`python tools/update_modules_check/update_modules_check.py sub_it_module $sub_modules 3 1`
           ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $run_it_modules -am -Pci
         env:
-          MAVEN_OPTS: -Xmx2048m
+          MAVEN_OPTS: -Xmx4096m
 
   all-connectors-it-3:
     needs: [ changes, sanity-check ]
@@ -446,4 +446,4 @@ jobs:
           run_it_modules=`python tools/update_modules_check/update_modules_check.py sub_it_module $sub_modules 3 2`
           ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $run_it_modules -am -Pci
         env:
-          MAVEN_OPTS: -Xmx2048m
\ No newline at end of file
+          MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md b/docs/en/seatunnel-engine/checkpoint-storage.md
index 31fb4a6ba..b6871e043 100644
--- a/docs/en/seatunnel-engine/checkpoint-storage.md
+++ b/docs/en/seatunnel-engine/checkpoint-storage.md
@@ -11,7 +11,7 @@ Checkpoint Storage is a storage mechanism for storing checkpoint data.
 
 SeaTunnel Engine supports the following checkpoint storage types:
 
-- HDFS (S3,HDFS,LocalFile)
+- HDFS (OSS,S3,HDFS,LocalFile)
 - LocalFile (native), (it's deprecated: use Hdfs(LocalFile) instead.
 
 We used the microkernel design pattern to separate the checkpoint storage module from the engine. This allows users to implement their own checkpoint storage modules.
@@ -36,8 +36,39 @@ seatunnel:
                   K1: V1 # plugin other configuration
                   K2: V2 # plugin other configuration   
 ```
+#### OSS
+Aliyun oss base on hdfs-file, so you can refer [hadoop oss docs](https://hadoop.apache.org/docs/stable/hadoop-aliyun/tools/hadoop-aliyun/index.html) to config oss.
+
+Except when interacting with oss buckets, the oss client needs the credentials needed to interact with buckets.
+The client supports multiple authentication mechanisms and can be configured as to which mechanisms to use, and their order of use. Custom implementations of org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider may also be used.
+if you used AliyunCredentialsProvider (can be obtained from the Aliyun Access Key Management), these consist of an access key, a secret key.
+you can config like this:
+```yaml
+seatunnel:
+  engine:
+    checkpoint:
+      interval: 6000
+      timeout: 7000
+      max-concurrent: 5
+      tolerable-failure: 2
+      storage:
+        type: hdfs
+        max-retained: 3
+        plugin-config:
+          storage-type: oss
+          oss.bucket: your-bucket
+          fs.oss.accessKeyId: your-access-key
+          fs.oss.accessKeySecret: your-secret-key
+          fs.oss.endpoint: endpoint address
+          fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider
+```
+For additional reading on the Hadoop Credential Provider API see: [Credential Provider API](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/CredentialProviderAPI.html).
+
+Aliyun oss Credential Provider implements see: [Auth Credential Providers](https://github.com/aliyun/aliyun-oss-java-sdk/tree/master/src/main/java/com/aliyun/oss/common/auth)
+
+
 #### S3
-S3 base on hdfs-file, so you can refer [hadoop docs](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) to config s3.
+S3 base on hdfs-file, so you can refer [hadoop s3 docs](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) to config s3.
 
 Except when interacting with public S3 buckets, the S3A client needs the credentials needed to interact with buckets.
 The client supports multiple authentication mechanisms and can be configured as to which mechanisms to use, and their order of use. Custom implementations of com.amazonaws.auth.AWSCredentialsProvider may also be used.
@@ -60,7 +91,6 @@ seatunnel:
                 plugin-config:
                     storage-type: s3
                     s3.bucket: your-bucket
-                    fs.s3a.endpoint: your-endpoint
                     fs.s3a.access-key: your-access-key
                     fs.s3a.secret-key: your-secret-key
                     fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
diff --git a/pom.xml b/pom.xml
index 7b03c043f..d30a1609a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -191,7 +191,6 @@
         <guava.version>19.0</guava.version>
         <auto-service.version>1.0.1</auto-service.version>
         <hadoop2.version>2.6.5</hadoop2.version>
-        <hadoop3.version>3.0.0</hadoop3.version>
         <seatunnel.shade.package>org.apache.seatunnel.shade</seatunnel.shade.package>
         <snappy-java.version>1.1.8.3</snappy-java.version>
         <checker.qual.version>3.10.0</checker.qual.version>
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
index 56f5e379c..0bd9118c4 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
@@ -29,6 +29,11 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>checkpoint-storage-hdfs</artifactId>
+
+    <properties>
+        <hadoop-aliyun.version>3.0.0</hadoop-aliyun.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -37,6 +42,12 @@
             <scope>provided</scope>
             <classifier>optional</classifier>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-aliyun</artifactId>
+            <version>${hadoop-aliyun.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
index ec494fbeb..3dcf9f38b 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
@@ -33,7 +33,7 @@ import java.util.Map;
  * HdfsCheckpointStorageFactory.
  * if you want to use HdfsCheckpointStorage, you should add the following configuration in the configuration file:
  * <pre>
- *      storage.type = hdfs # hdfs, local(default),s3
+ *      storage.type = hdfs # hdfs, local(default),s3, oss
  *  </pre>
  * then you need to configure the following parameters by the storage.type:
  * hdfs  {@link org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.HdfsConfiguration}
@@ -47,6 +47,16 @@ import java.util.Map;
  *      s3.bucket= "s3a://your bucket"
  *      fs.s3a.aws.credentials.provider = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
  *  </pre>
+ * oss   {@link org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.OssConfiguration}
+ * eg: oss
+ *  <pre>
+ *      storage.type = "oss"
+ *      fs.oss.accessKeyId = "your access key"
+ *      fs.oss.accessKeySecret = "your script key"
+ *      fs.oss.endpoint = "such as: oss-cn-hangzhou.aliyuncs.com"
+ *      oss.bucket= "oss://your bucket"
+ *      fs.oss.credentials.provider = "org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider"
+ *  </pre>
  */
 @AutoService(CheckpointStorageFactory.class)
 public class HdfsStorageFactory implements CheckpointStorageFactory {
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/FileConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/FileConfiguration.java
index b85acdfef..7f53d920f 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/FileConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/FileConfiguration.java
@@ -23,17 +23,18 @@ package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
 public enum FileConfiguration {
     LOCAL("local", new LocalConfiguration()),
     HDFS("hdfs", new HdfsConfiguration()),
-    S3("s3", new S3Configuration());
+    S3("s3", new S3Configuration()),
+    OSS("oss", new OssConfiguration());
 
     /**
      * file system type
      */
-    private String name;
+    private final String name;
 
     /**
      * file system configuration
      */
-    private AbstractConfiguration configuration;
+    private final AbstractConfiguration configuration;
 
     FileConfiguration(String name, AbstractConfiguration configuration) {
         this.name = name;
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/OssConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/OssConfiguration.java
new file mode 100644
index 000000000..128092ecc
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/OssConfiguration.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
+
+import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+public class OssConfiguration extends AbstractConfiguration {
+
+    /**************** OSS required keys ***************/
+    public static final String OSS_BUCKET_KEY = "oss.bucket";
+
+
+    /* OSS constants */
+    private static final String OSS_IMPL_KEY = "fs.oss.impl";
+    private static final String HDFS_OSS_IMPL = "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem";
+    private static final String OSS_KEY = "fs.oss.";
+
+    @Override
+    public Configuration buildConfiguration(Map<String, String> config) {
+        checkConfiguration(config, OSS_BUCKET_KEY);
+        Configuration hadoopConf = new Configuration();
+        hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(OSS_BUCKET_KEY));
+        hadoopConf.set(OSS_IMPL_KEY, HDFS_OSS_IMPL);
+        setExtraConfiguration(hadoopConf, config, OSS_KEY);
+        return hadoopConf;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/OssFileCheckpointTest.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/OssFileCheckpointTest.java
new file mode 100644
index 000000000..fac6bcd71
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/OssFileCheckpointTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
+
+import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Disabled("OSS is not available in CI, if you want to run this test, please set up your own oss environment")
+public class OssFileCheckpointTest extends AbstractFileCheckPointTest {
+    @BeforeAll
+    public static void setup() throws CheckpointStorageException {
+        Map<String, String> config = new HashMap<>();
+        config.put("storage.type", "oss");
+        config.put("fs.oss.accessKeyId", "your access key id");
+        config.put("fs.oss.accessKeySecret", "your access key secret");
+        config.put("fs.oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
+        config.put("oss.bucket", "oss://seatunnel-test/");
+        config.put("fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider");
+        STORAGE = new HdfsStorage(config);
+        initStorageData();
+    }
+}