You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "hailin0 (via GitHub)" <gi...@apache.org> on 2023/05/05 06:27:55 UTC

[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4683: [Feature][SeaTunnel Engine] Add OSS to cluster-mode type

hailin0 commented on code in PR #4683:
URL: https://github.com/apache/incubator-seatunnel/pull/4683#discussion_r1185722842


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/OssWriter.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.seatunnel.engine.imap.storage.file.wal.writer;
+
+import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
+import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
+import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils;
+import org.apache.seatunnel.engine.serializer.api.Serializer;
+
+import org.apache.curator.shaded.com.google.common.io.ByteStreams;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Slf4j
+public class OssWriter implements IFileWriter<IMapFileData> {
+    FileSystem fs;
+    Path parentPath;
+    Path path;
+    Serializer serializer;
+
+    ByteBuf bf = Unpooled.buffer(1024);
+
+    // block size
+    long blockSize = 1024 * 1024;
+
+    AtomicLong index = new AtomicLong(0);

Review Comment:
   add `private ...`



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileOSSStorageTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.imap.storage.file;
+
+import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledOnOs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.WRITE_DATA_TIMEOUT_MILLISECONDS_KEY;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.condition.OS.LINUX;
+import static org.junit.jupiter.api.condition.OS.MAC;
+
+@EnabledOnOs({LINUX, MAC})
+@Disabled
+public class IMapFileOSSStorageTest {
+
+    static String OSS_BUCKET_NAME = "oss://your bucket name/";
+    static String OSS_ENDPOINT = "your oss endpoint";
+    static String OSS_ACCESS_KEY_ID = "oss accessKey id";
+    static String OSS_ACCESS_KEY_SECRET = "oss accessKey secret";
+    private static final Configuration CONF;
+
+    private static final IMapFileStorage STORAGE;
+
+    static {
+        CONF = new Configuration();
+        CONF.set("storage.type", "oss");
+        CONF.set("fs.defaultFS", OSS_BUCKET_NAME);
+        CONF.set("fs.oss.endpoint", OSS_ENDPOINT);
+        CONF.set("fs.oss.accessKeyId", OSS_ACCESS_KEY_ID);
+        CONF.set("fs.oss.accessKeySecret", OSS_ACCESS_KEY_SECRET);
+        CONF.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+        CONF.set(
+                "fs.oss.credentials.provider",
+                "org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider");
+
+        STORAGE = new IMapFileStorage();
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("storage.type", "oss");
+        properties.put("oss.bucket", OSS_BUCKET_NAME);
+        properties.put("fs.oss.endpoint", OSS_ENDPOINT);
+        properties.put("fs.oss.accessKeyId", OSS_ACCESS_KEY_ID);
+        properties.put("fs.oss.accessKeySecret", OSS_ACCESS_KEY_SECRET);
+        properties.put("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+        properties.put(
+                "fs.oss.credentials.provider",
+                "org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider");
+        properties.put(FileConstants.FileInitProperties.BUSINESS_KEY, "random");
+        properties.put(FileConstants.FileInitProperties.NAMESPACE_KEY, "/seatunnel-test/2");

Review Comment:
   extract variable `/seatunnel-test/2`



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -142,7 +143,7 @@ public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties prope
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
         scheduledExecutorService.scheduleAtFixedRate(
                 this::updateMetricsContextInImap,
-                0,
+                30 * 1000, // Wait for MapStore loading to complete

Review Comment:
   Is the number too big?
   
   @ic4y 



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/HdfsWriter.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.imap.storage.file.wal.writer;
+
+import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
+import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils;
+import org.apache.seatunnel.engine.serializer.api.Serializer;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+public class HdfsWriter implements IFileWriter<IMapFileData> {
+
+    FSDataOutputStream out;
+
+    Serializer serializer;

Review Comment:
   ```suggestion
       private FSDataOutputStream out;
   
       private Serializer serializer;
   ```



##########
docs/en/seatunnel-engine/deployment.md:
##########
@@ -179,6 +179,7 @@ map:
            type: hdfs
            namespace: /tmp/seatunnel/imap
            clusterName: seatunnel-cluster
+           storage.type: hdfs

Review Comment:
   Is there a default value? for older versions



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/IFileWriter.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.seatunnel.engine.imap.storage.file.wal.writer;
+
+import org.apache.seatunnel.engine.serializer.api.Serializer;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+public interface IFileWriter<T> extends AutoCloseable {
+    String FILE_NAME = "wal.txt";

Review Comment:
   @EricJoy2048 @Hisoka-X please check



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/FileConfiguration.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.imap.storage.file.config;
+
+public enum FileConfiguration {
+    HDFS("hdfs", new HdfsConfiguration()),
+    S3("s3", new S3Configuration()),
+    OSS("oss", new OssConfiguration());
+
+    /** file system type */
+    private final String name;
+
+    /** file system configuration */
+    private final AbstractConfiguration configuration;
+
+    FileConfiguration(String name, AbstractConfiguration configuration) {
+        this.name = name;
+        this.configuration = configuration;
+    }
+
+    public AbstractConfiguration getConfiguration(String name) {

Review Comment:
   Unused `name`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org