You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/08/18 16:01:54 UTC

[GitHub] [incubator-seatunnel] TyrantLucifer opened a new pull request, #2467: [Feature][Connector-V2] Add oss source connector

TyrantLucifer opened a new pull request, #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   #1946 Oss source connector
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [x] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2467: [Feature][Connector-V2] Add oss source connector

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467#discussion_r949827092


##########
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/fs/OSSFileSystem.java:
##########
@@ -0,0 +1,997 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.oss.fs;
+
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_USER_AGENT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DELETE_OBJECTS_ONETIME_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_DOMAIN;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_HOST;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PORT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_WORKSTATION;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.MAX_RETURNED_KEYS_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.OSS_DEFAULT_BLOCK_SIZE;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.OSSErrorCode;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.comm.Protocol;
+import com.aliyun.oss.event.ProgressEvent;
+import com.aliyun.oss.event.ProgressListener;
+import com.aliyun.oss.model.CopyObjectRequest;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.PutObjectRequest;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+
+/**
+ * Hadoop File System implementation for Aliyun OSS.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class OSSFileSystem extends FileSystem {
+
+    private URI uri;
+    private Path workingDir;
+    private String bucket;
+    private OSSClient client;
+
+    public static final Logger LOG = LoggerFactory.getLogger(OSSFileSystem.class);
+
+    /**
+     * Called after a new FileSystem instance is constructed.
+     *
+     * @param name URI
+     * @param conf configuration
+     * @throws IOException IOException
+     */
+    public void initialize(URI name, Configuration conf) throws IOException {
+        super.initialize(name, conf);
+
+        uri = URI.create(name.getScheme() + "://" + name.getAuthority());
+        workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri,
+                this.getWorkingDirectory());
+
+        // Try to get our credentials or just connect anonymously
+        String accessKeyId = conf.get(SmartOSSClientConfig.HADOOP_ACCESS_KEY, null);
+        String accessKeySecret = conf.get(SmartOSSClientConfig.HADOOP_SECRET_KEY, null);
+        String endpoint = conf.get(SmartOSSClientConfig.HADOOP_ENDPOINT, null);
+
+        bucket = name.getHost();
+
+        // Initialize OSS Client, please refer to help.aliyun.com/document_detail/oss/sdk/java-sdk/init.html
+        // for the detailed information.
+        SmartOSSClientConfig ossConf = new SmartOSSClientConfig();
+        //user agent
+        ossConf.setUserAgent(conf.getTrimmed(HADOOP_PROXY_HOST, DEFAULT_USER_AGENT));
+        //connect to oss through a proxy server
+        String proxyHost = conf.getTrimmed(HADOOP_PROXY_HOST, "");
+        int proxyPort = conf.getInt(HADOOP_PROXY_PORT, -1);
+        String proxyUsername = conf.getTrimmed(HADOOP_PROXY_USERNAME);
+        String proxyPassword = conf.getTrimmed(HADOOP_PROXY_PASSWORD);
+        if (!proxyHost.isEmpty() && proxyPort >= 0) {
+            ossConf.setProxyHost(proxyHost);
+            ossConf.setProxyPort(proxyPort);
+        }
+        if (proxyUsername != null) {
+            ossConf.setProxyUsername(proxyUsername);
+        }
+        if (proxyPassword != null) {
+            ossConf.setProxyPassword(proxyPassword);
+        }
+        ossConf.setProxyDomain(conf.getTrimmed(HADOOP_PROXY_DOMAIN));
+        ossConf.setProxyWorkstation(conf.getTrimmed(HADOOP_PROXY_WORKSTATION));
+        //MaxConnections
+        ossConf.setMaxConnections(conf.getInt(HADOOP_MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS));
+        //SocketTimeout
+        ossConf.setSocketTimeout(conf.getInt(HADOOP_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT));
+        //ConnectionTimeout
+        ossConf.setConnectionTimeout(conf.getInt(HADOOP_ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT));
+        //MaxErrorRetry
+        ossConf.setMaxErrorRetry(conf.getInt(HADOOP_MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES));
+        //Protocol
+        boolean secureConnections = conf.getBoolean(HADOOP_SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);
+        ossConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
+        //SupportCname
+        ossConf.setSupportCname(false);  //ListBuckets can not be used when enable CNAME.
+        ossConf.setSLDEnabled(false);
+
+        //extra configuration for multiple part copy/upload
+        ossConf.setMultipartUploadThreshold(conf.getLong(HADOOP_MULTIPART_UPLOAD_THRESHOLD, DEFAULT_MULTIPART_UPLOAD_THRESHOLD));
+        ossConf.setMinimumUploadPartSize(conf.getLong(HADOOP_MULTIPART_UPLOAD_PART_SIZE, DEFAULT_MINIMUM_UPLOAD_PART_SIZE));
+        ossConf.setMultipartCopyThreshold(conf.getLong(HADOOP_MULTIPART_COPY_THRESHOLD, DEFAULT_MULTIPART_COPY_THRESHOLD));
+        ossConf.setMultipartCopyPartSize(conf.getLong(HADOOP_MULTIPART_COPY_PART_SIZE, DEFAULT_MINIMUM_COPY_PART_SIZE));
+        // extra configuration for multiple part copy/upload thread pool
+        ossConf.setCorePoolSize(conf.getInt(HADOOP_CORE_POOL_SIZE, DEFAULT_CORE_POOL_SIZE));
+        ossConf.setMaxPoolSize(conf.getInt(HADOOP_MAX_POOL_SIZE, DEFAULT_MAX_POOL_SIZE));
+        ossConf.setKeepAliveTime(conf.getInt(HADOOP_KEEP_ALIVE_TIME, DEFAULT_KEEP_ALIVE_TIME));
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
+                            "domain {} as workstation {}", ossConf.getProxyHost(),
+                    ossConf.getProxyPort(), ossConf.getProxyUsername(),
+                    ossConf.getProxyPassword(), ossConf.getProxyDomain(),
+                    ossConf.getProxyWorkstation());
+        }
+
+        client = new SmartOSSClient(endpoint, accessKeyId, accessKeySecret, ossConf);
+        if (!client.doesBucketExist(bucket)) {
+            throw new IOException("Bucket " + bucket + " does not exist");
+        }
+        setConf(conf);
+
+    }
+
+    /**
+     * Return the protocol scheme for the FileSystem.
+     *
+     * @return "oss"
+     */
+    public String getScheme() {
+        return "oss";
+    }
+
+    /**
+     * Returns a URI whose scheme and authority identify this FileSystem.
+     */
+    public URI getUri() {
+        return uri;
+    }
+
+    /**
+     * Returns the OSS client used by this filesystem.
+     *
+     * @return oss client
+     */
+    OSSClient getOSSClient() {
+        return client;
+    }
+
+    public OSSFileSystem() {
+        super();
+    }
+
+    /**
+     * Turns a path (relative or otherwise) into an OSS key
+     *
+     * @param path path
+     * @return the Path object
+     */
+    private String pathToKey(Path path) {

Review Comment:
   This method is called by create, `path` must not by empty, I think it's useless too, what do you think about it?
   
   ![image](https://user-images.githubusercontent.com/51053924/185552976-3155b87a-1bb3-4078-a81e-725009724b2a.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs merged pull request #2467: [Feature][Connector-V2] Add oss source connector

Posted by GitBox <gi...@apache.org>.
CalvinKirs merged PR #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #2467: [Feature][Connector-V2] Add oss source connector

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on PR #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467#issuecomment-1219670044

   @CalvinKirs pls review,thx.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #2467: [Feature][Connector-V2] Add oss source connector

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467#discussion_r952124268


##########
pom.xml:
##########
@@ -534,6 +535,12 @@
                 <version>${flink.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-aliyun</artifactId>
+                <version>${hadoop-aliyun.version}</version>
+            </dependency>

Review Comment:
   I mean: Does this dependency have version requirements for hadoop? 
    https://github.com/apache/incubator-seatunnel/pull/2467/files#diff-d9cdd3b0000fbae0ab6febacb4bc6acc28fd71d4fefb238c058ba3c532f7f43bR42
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2467: [Feature][Connector-V2] Add oss source connector

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467#discussion_r949756944


##########
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/fs/OSSFileSystem.java:
##########
@@ -0,0 +1,997 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.oss.fs;
+
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_USER_AGENT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DELETE_OBJECTS_ONETIME_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_DOMAIN;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_HOST;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PORT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_WORKSTATION;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.MAX_RETURNED_KEYS_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.OSS_DEFAULT_BLOCK_SIZE;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.OSSErrorCode;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.comm.Protocol;
+import com.aliyun.oss.event.ProgressEvent;
+import com.aliyun.oss.event.ProgressListener;
+import com.aliyun.oss.model.CopyObjectRequest;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.PutObjectRequest;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+
+/**
+ * Hadoop File System implementation for Aliyun OSS.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class OSSFileSystem extends FileSystem {
+
+    private URI uri;
+    private Path workingDir;
+    private String bucket;
+    private OSSClient client;
+
+    public static final Logger LOG = LoggerFactory.getLogger(OSSFileSystem.class);
+
+    /**
+     * Called after a new FileSystem instance is constructed.
+     *
+     * @param name URI
+     * @param conf configuration
+     * @throws IOException IOException
+     */
+    public void initialize(URI name, Configuration conf) throws IOException {

Review Comment:
   Please use `@Nonnull` mark the parameter if it can not be null.



##########
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/fs/OSSFileSystem.java:
##########
@@ -0,0 +1,997 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.oss.fs;
+
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_USER_AGENT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DELETE_OBJECTS_ONETIME_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_DOMAIN;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_HOST;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PORT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_WORKSTATION;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.MAX_RETURNED_KEYS_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.OSS_DEFAULT_BLOCK_SIZE;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.OSSErrorCode;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.comm.Protocol;
+import com.aliyun.oss.event.ProgressEvent;
+import com.aliyun.oss.event.ProgressListener;
+import com.aliyun.oss.model.CopyObjectRequest;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.PutObjectRequest;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+
+/**
+ * Hadoop File System implementation for Aliyun OSS.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class OSSFileSystem extends FileSystem {
+
+    private URI uri;
+    private Path workingDir;
+    private String bucket;
+    private OSSClient client;
+
+    public static final Logger LOG = LoggerFactory.getLogger(OSSFileSystem.class);
+
+    /**
+     * Called after a new FileSystem instance is constructed.
+     *
+     * @param name URI
+     * @param conf configuration
+     * @throws IOException IOException
+     */
+    public void initialize(URI name, Configuration conf) throws IOException {
+        super.initialize(name, conf);
+
+        uri = URI.create(name.getScheme() + "://" + name.getAuthority());
+        workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri,
+                this.getWorkingDirectory());
+
+        // Try to get our credentials or just connect anonymously
+        String accessKeyId = conf.get(SmartOSSClientConfig.HADOOP_ACCESS_KEY, null);
+        String accessKeySecret = conf.get(SmartOSSClientConfig.HADOOP_SECRET_KEY, null);
+        String endpoint = conf.get(SmartOSSClientConfig.HADOOP_ENDPOINT, null);
+
+        bucket = name.getHost();
+
+        // Initialize OSS Client, please refer to help.aliyun.com/document_detail/oss/sdk/java-sdk/init.html
+        // for the detailed information.
+        SmartOSSClientConfig ossConf = new SmartOSSClientConfig();
+        //user agent
+        ossConf.setUserAgent(conf.getTrimmed(HADOOP_PROXY_HOST, DEFAULT_USER_AGENT));
+        //connect to oss through a proxy server
+        String proxyHost = conf.getTrimmed(HADOOP_PROXY_HOST, "");
+        int proxyPort = conf.getInt(HADOOP_PROXY_PORT, -1);
+        String proxyUsername = conf.getTrimmed(HADOOP_PROXY_USERNAME);
+        String proxyPassword = conf.getTrimmed(HADOOP_PROXY_PASSWORD);
+        if (!proxyHost.isEmpty() && proxyPort >= 0) {
+            ossConf.setProxyHost(proxyHost);
+            ossConf.setProxyPort(proxyPort);
+        }
+        if (proxyUsername != null) {
+            ossConf.setProxyUsername(proxyUsername);
+        }
+        if (proxyPassword != null) {
+            ossConf.setProxyPassword(proxyPassword);
+        }
+        ossConf.setProxyDomain(conf.getTrimmed(HADOOP_PROXY_DOMAIN));
+        ossConf.setProxyWorkstation(conf.getTrimmed(HADOOP_PROXY_WORKSTATION));
+        //MaxConnections
+        ossConf.setMaxConnections(conf.getInt(HADOOP_MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS));
+        //SocketTimeout
+        ossConf.setSocketTimeout(conf.getInt(HADOOP_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT));
+        //ConnectionTimeout
+        ossConf.setConnectionTimeout(conf.getInt(HADOOP_ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT));
+        //MaxErrorRetry
+        ossConf.setMaxErrorRetry(conf.getInt(HADOOP_MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES));
+        //Protocol
+        boolean secureConnections = conf.getBoolean(HADOOP_SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);
+        ossConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
+        //SupportCname
+        ossConf.setSupportCname(false);  //ListBuckets can not be used when enable CNAME.
+        ossConf.setSLDEnabled(false);
+
+        //extra configuration for multiple part copy/upload
+        ossConf.setMultipartUploadThreshold(conf.getLong(HADOOP_MULTIPART_UPLOAD_THRESHOLD, DEFAULT_MULTIPART_UPLOAD_THRESHOLD));
+        ossConf.setMinimumUploadPartSize(conf.getLong(HADOOP_MULTIPART_UPLOAD_PART_SIZE, DEFAULT_MINIMUM_UPLOAD_PART_SIZE));
+        ossConf.setMultipartCopyThreshold(conf.getLong(HADOOP_MULTIPART_COPY_THRESHOLD, DEFAULT_MULTIPART_COPY_THRESHOLD));
+        ossConf.setMultipartCopyPartSize(conf.getLong(HADOOP_MULTIPART_COPY_PART_SIZE, DEFAULT_MINIMUM_COPY_PART_SIZE));
+        // extra configuration for multiple part copy/upload thread pool
+        ossConf.setCorePoolSize(conf.getInt(HADOOP_CORE_POOL_SIZE, DEFAULT_CORE_POOL_SIZE));
+        ossConf.setMaxPoolSize(conf.getInt(HADOOP_MAX_POOL_SIZE, DEFAULT_MAX_POOL_SIZE));
+        ossConf.setKeepAliveTime(conf.getInt(HADOOP_KEEP_ALIVE_TIME, DEFAULT_KEEP_ALIVE_TIME));
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
+                            "domain {} as workstation {}", ossConf.getProxyHost(),
+                    ossConf.getProxyPort(), ossConf.getProxyUsername(),
+                    ossConf.getProxyPassword(), ossConf.getProxyDomain(),
+                    ossConf.getProxyWorkstation());
+        }
+
+        client = new SmartOSSClient(endpoint, accessKeyId, accessKeySecret, ossConf);
+        if (!client.doesBucketExist(bucket)) {
+            throw new IOException("Bucket " + bucket + " does not exist");
+        }
+        setConf(conf);
+
+    }
+
+    /**
+     * Return the protocol scheme for the FileSystem.
+     *
+     * @return "oss"
+     */
+    public String getScheme() {
+        return "oss";
+    }
+
+    /**
+     * Returns a URI whose scheme and authority identify this FileSystem.
+     */
+    public URI getUri() {
+        return uri;
+    }
+
+    /**
+     * Returns the OSS client used by this filesystem.
+     *
+     * @return oss client
+     */
+    OSSClient getOSSClient() {
+        return client;
+    }
+
+    public OSSFileSystem() {
+        super();
+    }
+
+    /**
+     * Turns a path (relative or otherwise) into an OSS key
+     *
+     * @param path path
+     * @return the Path object
+     */
+    private String pathToKey(Path path) {

Review Comment:
   As above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #2467: [Feature][Connector-V2] Add oss source connector

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467#discussion_r951419030


##########
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/fs/OSSFileSystem.java:
##########
@@ -0,0 +1,997 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.oss.fs;
+
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_USER_AGENT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DELETE_OBJECTS_ONETIME_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_DOMAIN;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_HOST;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PORT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_WORKSTATION;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.MAX_RETURNED_KEYS_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.OSS_DEFAULT_BLOCK_SIZE;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.OSSErrorCode;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.comm.Protocol;
+import com.aliyun.oss.event.ProgressEvent;
+import com.aliyun.oss.event.ProgressListener;
+import com.aliyun.oss.model.CopyObjectRequest;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.PutObjectRequest;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+
+/**
+ * Hadoop File System implementation for Aliyun OSS.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class OSSFileSystem extends FileSystem {
+
+    private URI uri;
+    private Path workingDir;
+    private String bucket;
+    private OSSClient client;
+
+    public static final Logger LOG = LoggerFactory.getLogger(OSSFileSystem.class);
+
+    /**
+     * Called after a new FileSystem instance is constructed.
+     *
+     * @param name URI
+     * @param conf configuration
+     * @throws IOException IOException
+     */
+    public void initialize(URI name, Configuration conf) throws IOException {
+        super.initialize(name, conf);
+
+        uri = URI.create(name.getScheme() + "://" + name.getAuthority());
+        workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri,
+                this.getWorkingDirectory());
+
+        // Try to get our credentials or just connect anonymously
+        String accessKeyId = conf.get(SmartOSSClientConfig.HADOOP_ACCESS_KEY, null);
+        String accessKeySecret = conf.get(SmartOSSClientConfig.HADOOP_SECRET_KEY, null);
+        String endpoint = conf.get(SmartOSSClientConfig.HADOOP_ENDPOINT, null);
+
+        bucket = name.getHost();
+
+        // Initialize OSS Client, please refer to help.aliyun.com/document_detail/oss/sdk/java-sdk/init.html
+        // for the detailed information.
+        SmartOSSClientConfig ossConf = new SmartOSSClientConfig();
+        //user agent
+        ossConf.setUserAgent(conf.getTrimmed(HADOOP_PROXY_HOST, DEFAULT_USER_AGENT));
+        //connect to oss through a proxy server
+        String proxyHost = conf.getTrimmed(HADOOP_PROXY_HOST, "");
+        int proxyPort = conf.getInt(HADOOP_PROXY_PORT, -1);
+        String proxyUsername = conf.getTrimmed(HADOOP_PROXY_USERNAME);
+        String proxyPassword = conf.getTrimmed(HADOOP_PROXY_PASSWORD);
+        if (!proxyHost.isEmpty() && proxyPort >= 0) {
+            ossConf.setProxyHost(proxyHost);
+            ossConf.setProxyPort(proxyPort);
+        }
+        if (proxyUsername != null) {
+            ossConf.setProxyUsername(proxyUsername);
+        }
+        if (proxyPassword != null) {
+            ossConf.setProxyPassword(proxyPassword);
+        }
+        ossConf.setProxyDomain(conf.getTrimmed(HADOOP_PROXY_DOMAIN));
+        ossConf.setProxyWorkstation(conf.getTrimmed(HADOOP_PROXY_WORKSTATION));
+        //MaxConnections
+        ossConf.setMaxConnections(conf.getInt(HADOOP_MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS));
+        //SocketTimeout
+        ossConf.setSocketTimeout(conf.getInt(HADOOP_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT));
+        //ConnectionTimeout
+        ossConf.setConnectionTimeout(conf.getInt(HADOOP_ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT));
+        //MaxErrorRetry
+        ossConf.setMaxErrorRetry(conf.getInt(HADOOP_MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES));
+        //Protocol
+        boolean secureConnections = conf.getBoolean(HADOOP_SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);
+        ossConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
+        //SupportCname
+        ossConf.setSupportCname(false);  //ListBuckets can not be used when enable CNAME.
+        ossConf.setSLDEnabled(false);
+
+        //extra configuration for multiple part copy/upload
+        ossConf.setMultipartUploadThreshold(conf.getLong(HADOOP_MULTIPART_UPLOAD_THRESHOLD, DEFAULT_MULTIPART_UPLOAD_THRESHOLD));
+        ossConf.setMinimumUploadPartSize(conf.getLong(HADOOP_MULTIPART_UPLOAD_PART_SIZE, DEFAULT_MINIMUM_UPLOAD_PART_SIZE));
+        ossConf.setMultipartCopyThreshold(conf.getLong(HADOOP_MULTIPART_COPY_THRESHOLD, DEFAULT_MULTIPART_COPY_THRESHOLD));
+        ossConf.setMultipartCopyPartSize(conf.getLong(HADOOP_MULTIPART_COPY_PART_SIZE, DEFAULT_MINIMUM_COPY_PART_SIZE));
+        // extra configuration for multiple part copy/upload thread pool
+        ossConf.setCorePoolSize(conf.getInt(HADOOP_CORE_POOL_SIZE, DEFAULT_CORE_POOL_SIZE));
+        ossConf.setMaxPoolSize(conf.getInt(HADOOP_MAX_POOL_SIZE, DEFAULT_MAX_POOL_SIZE));
+        ossConf.setKeepAliveTime(conf.getInt(HADOOP_KEEP_ALIVE_TIME, DEFAULT_KEEP_ALIVE_TIME));
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
+                            "domain {} as workstation {}", ossConf.getProxyHost(),
+                    ossConf.getProxyPort(), ossConf.getProxyUsername(),
+                    ossConf.getProxyPassword(), ossConf.getProxyDomain(),
+                    ossConf.getProxyWorkstation());
+        }
+
+        client = new SmartOSSClient(endpoint, accessKeyId, accessKeySecret, ossConf);
+        if (!client.doesBucketExist(bucket)) {
+            throw new IOException("Bucket " + bucket + " does not exist");
+        }
+        setConf(conf);
+
+    }
+
+    /**
+     * Return the protocol scheme for the FileSystem.
+     *
+     * @return "oss"
+     */
+    public String getScheme() {
+        return "oss";
+    }
+
+    /**
+     * Returns a URI whose scheme and authority identify this FileSystem.
+     */
+    public URI getUri() {
+        return uri;
+    }
+
+    /**
+     * Returns the OSS client used by this filesystem.
+     *
+     * @return oss client
+     */
+    OSSClient getOSSClient() {
+        return client;
+    }
+
+    public OSSFileSystem() {
+        super();
+    }
+
+    /**
+     * Turns a path (relative or otherwise) into an OSS key
+     *
+     * @param path path
+     * @return the Path object
+     */
+    private String pathToKey(Path path) {
+        if (!path.isAbsolute()) {
+            path = new Path(workingDir, path);
+        }
+        if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
+            return "";
+        }
+        return path.toUri().getPath().substring(1);
+    }
+
+    private Path keyToPath(String key) {
+        return new Path("/" + key);
+    }
+
+    /**
+     * Opens an FSDataInputStream at the indicated Path.
+     *
+     * @param f          the file name to open
+     * @param bufferSize the size of the buffer to be used.
+     */
+    public FSDataInputStream open(Path f, int bufferSize)
+            throws IOException {
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Opening '{}' for reading.", f);
+        }
+        final FileStatus fileStatus = getFileStatus(f);
+        if (fileStatus.isDirectory()) {
+            throw new FileNotFoundException("Can't open " + f + " because it is a directory");
+        }
+
+        return new FSDataInputStream(new OSSInputStream(bucket, pathToKey(f),
+                fileStatus.getLen(), client, statistics));
+    }
+
+    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
+                                     int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+        String key = pathToKey(f);
+
+        if (!overwrite && exists(f)) {
+            throw new FileAlreadyExistsException(f + " already exists");
+        }
+        // We pass null to FSDataOutputStream, so it won't count writes that are being buffered to a file
+        return new FSDataOutputStream(new OSSOutputStream(getConf(), this,
+                bucket, key, progress, statistics), null);
+    }
+
+    public FSDataOutputStream append(Path f, int bufferSize,
+                                     Progressable progress) throws IOException {
+        throw new IOException("Not supported");
+    }
+
+    public boolean rename(Path src, Path dst) throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Rename path {} to {}", src, dst);
+        }
+
+        String srcKey = pathToKey(src);
+        String dstKey = pathToKey(dst);
+
+        if (srcKey.isEmpty() || dstKey.isEmpty()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("rename: src or dst are empty");
+            }
+            return false;
+        }
+
+        OSSFileStatus srcStatus;
+        try {
+            srcStatus = getFileStatus(src);
+        } catch (FileNotFoundException e) {
+            LOG.error("rename: src not found {}", src);
+            return false;
+        }
+
+        if (srcKey.equals(dstKey)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("rename: src and dst refer to the same file or directory");
+            }
+            return srcStatus.isFile();
+        }
+
+        OSSFileStatus dstStatus = null;
+        try {
+            dstStatus = getFileStatus(dst);
+
+            if (srcStatus.isDirectory() && dstStatus.isFile()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("rename: src is a directory and dst is a file");
+                }
+                return false;
+            }
+
+            if (dstStatus.isDirectory() && !dstStatus.isEmptyDirectory()) {
+                return false;
+            }
+        } catch (FileNotFoundException e) {
+            // Parent must exist
+            Path parent = dst.getParent();
+            if (!pathToKey(parent).isEmpty()) {
+                try {
+                    OSSFileStatus dstParentStatus = getFileStatus(dst.getParent());
+                    if (!dstParentStatus.isDirectory()) {
+                        return false;
+                    }
+                } catch (FileNotFoundException e2) {
+                    return false;
+                }
+            }
+        }
+
+        // Ok! Time to start
+        if (srcStatus.isFile()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("rename: renaming file " + src + " to " + dst);
+            }
+            if (dstStatus != null && dstStatus.isDirectory()) {
+                String newDstKey = dstKey;
+                if (!newDstKey.endsWith("/")) {
+                    newDstKey = newDstKey + "/";
+                }
+                String filename =
+                        srcKey.substring(pathToKey(src.getParent()).length() + 1);
+                newDstKey = newDstKey + filename;
+                copyFile(srcKey, newDstKey);
+            } else {
+                copyFile(srcKey, dstKey);
+            }
+            delete(src, false);
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("rename: renaming directory " + src + " to " + dst);
+            }
+
+            // This is a directory to directory copy
+            if (!dstKey.endsWith("/")) {
+                dstKey = dstKey + "/";
+            }
+
+            if (!srcKey.endsWith("/")) {
+                srcKey = srcKey + "/";
+            }
+
+            //Verify dest is not a child of the source directory
+            if (dstKey.startsWith(srcKey)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("cannot rename a directory to a subdirectory of self");
+                }
+                return false;
+            }
+
+            List<String> keysToDelete =
+                    new ArrayList<String>();
+            if (dstStatus != null && dstStatus.isEmptyDirectory()) {
+                // delete unnecessary fake directory.
+                keysToDelete.add(dstKey);
+            }
+
+            ListObjectsRequest request = new ListObjectsRequest();
+            request.setBucketName(bucket);
+            request.setPrefix(srcKey);
+            request.setMaxKeys(MAX_RETURNED_KEYS_LIMIT);
+
+            ObjectListing objects = client.listObjects(request);
+            statistics.incrementReadOps(1);
+
+            String nextMarker;
+            while (true) {
+                for (OSSObjectSummary summary : objects.getObjectSummaries()) {
+                    keysToDelete.add(summary.getKey());
+                    String newDstKey = dstKey + summary.getKey().substring(srcKey.length());
+                    copyFile(summary.getKey(), newDstKey);
+
+                    if (keysToDelete.size() == DELETE_OBJECTS_ONETIME_LIMIT) {
+                        DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
+                        client.deleteObjects(deleteRequest);
+                        statistics.incrementWriteOps(1);
+                        keysToDelete.clear();
+                    }
+                }
+
+                if (objects.isTruncated()) {
+                    nextMarker = objects.getNextMarker();
+                    objects = client.listObjects(request.withMarker(nextMarker));
+                    statistics.incrementReadOps(1);
+                } else {
+                    if (keysToDelete.size() > 0) {
+                        DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
+                        client.deleteObjects(deleteRequest);
+                        statistics.incrementWriteOps(1);
+                    }
+                    break;
+                }
+            }
+        }
+
+        if (src.getParent() != dst.getParent()) {
+            deleteUnnecessaryEmptyDirectories(dst.getParent());
+            createEmptyDirectoryIfNecessary(src.getParent());
+        }
+        return true;
+    }
+
+    @Override
+    public boolean delete(Path f, boolean recursive) throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Delete path " + f + " - recursive " + recursive);
+        }
+        OSSFileStatus status;
+        try {
+            status = getFileStatus(f);
+        } catch (FileNotFoundException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Couldn't delete " + f + " - does not exist");
+            }
+            return false;
+        }
+
+        String key = pathToKey(f);
+
+        if (status.isDirectory()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("delete: Path is a directory");
+            }
+
+            if (!recursive && !status.isEmptyDirectory()) {
+                throw new IOException("Path is a folder: " + f + " and it is not an empty directory");
+            }
+
+            if (!key.endsWith("/")) {
+                key = key + "/";
+            }
+
+            if (key.equals("/")) {
+                LOG.info("oss cannot delete the root directory");
+                return false;
+            }
+
+            if (status.isEmptyDirectory()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Deleting empty directory");
+                }
+                client.deleteObject(bucket, key);
+                statistics.incrementWriteOps(1);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Getting objects for directory prefix " + key + " to delete");
+                }
+
+                ListObjectsRequest request = new ListObjectsRequest();
+                request.setBucketName(bucket);
+                request.setPrefix(key);
+                // Hopefully not setting a delimiter will cause this to find everything
+                //request.setDelimiter("/");
+                request.setMaxKeys(MAX_RETURNED_KEYS_LIMIT);
+
+                List<String> keys = new ArrayList<String>();
+                ObjectListing objects = client.listObjects(request);
+                statistics.incrementReadOps(1);
+
+                String nextMarker;
+                while (true) {
+                    for (OSSObjectSummary summary : objects.getObjectSummaries()) {
+                        keys.add(summary.getKey());
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Got object to delete " + summary.getKey());
+                        }
+
+                        if (keys.size() == DELETE_OBJECTS_ONETIME_LIMIT) {
+                            DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keys);
+                            client.deleteObjects(deleteRequest);
+                            statistics.incrementWriteOps(1);
+                            keys.clear();
+                        }
+                    }
+
+                    if (objects.isTruncated()) {
+                        nextMarker = objects.getNextMarker();
+                        objects = client.listObjects(request.withMarker(nextMarker));
+                        statistics.incrementReadOps(1);
+                    } else {
+                        if (!keys.isEmpty()) {
+                            DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keys);
+                            client.deleteObjects(deleteRequest);
+                            statistics.incrementWriteOps(1);
+                        }
+                        break;
+                    }
+                }
+            }
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("delete: Path is a file");
+            }
+            client.deleteObject(bucket, key);
+            statistics.incrementWriteOps(1);
+        }
+
+        createEmptyDirectoryIfNecessary(f.getParent());
+
+        return true;
+    }
+
+    private void createEmptyDirectoryIfNecessary(Path f) throws IOException {
+        String key = pathToKey(f);
+        if (!key.isEmpty() && !exists(f)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Creating new empty directory at " + f);
+            }
+            createEmptyDirectory(bucket, key);
+        }
+    }
+
+    /**
+     * List the statuses of the files/directories in the given path if the path is
+     * a directory.
+     *
+     * @param f given path
+     * @return the statuses of the files/directories in the given patch
+     * @throws FileNotFoundException when the path does not exist;
+     *                               IOException see specific implementation
+     */
+    @Override
+    public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+            IOException {
+        String key = pathToKey(f);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("List status for path: " + f);
+        }
+
+        final List<FileStatus> result = new ArrayList<FileStatus>();
+        final FileStatus fileStatus = getFileStatus(f);
+
+        if (fileStatus.isDirectory()) {
+            if (!key.isEmpty()) {
+                key = key + "/";
+            }
+
+            ListObjectsRequest request = new ListObjectsRequest();
+            request.setBucketName(bucket);
+            request.setPrefix(key);
+            request.setDelimiter("/");
+            request.setMaxKeys(MAX_RETURNED_KEYS_LIMIT);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("listStatus: doing listObjects for directory " + key);
+            }
+
+            ObjectListing objects = client.listObjects(request);
+            statistics.incrementReadOps(1);
+
+            String nextMarker;
+            while (true) {
+                for (OSSObjectSummary summary : objects.getObjectSummaries()) {
+                    Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir);
+                    if (keyPath.equals(f)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Ignoring: " + keyPath);
+                        }
+                        continue;
+                    }
+
+                    if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
+                        result.add(new OSSFileStatus(true, true, keyPath));
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Adding: fd: " + keyPath);
+                        }
+                    } else {
+                        result.add(new OSSFileStatus(summary.getSize(),
+                                dateToLong(summary.getLastModified()), keyPath,
+                                getDefaultBlockSize(f.makeQualified(uri, workingDir))));
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Adding: fi: " + keyPath);
+                        }
+                    }
+                }
+
+                for (String prefix : objects.getCommonPrefixes()) {
+                    Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
+                    if (keyPath.equals(f)) {
+                        continue;
+                    }
+                    result.add(new OSSFileStatus(true, false, keyPath));
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Adding: rd: " + keyPath);
+                    }
+                }
+
+                if (objects.isTruncated()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("listStatus: list truncated - getting next batch");
+                    }
+                    nextMarker = objects.getNextMarker();
+                    objects = client.listObjects(request.withMarker(nextMarker));
+                    statistics.incrementReadOps(1);
+                } else {
+                    break;
+                }
+            }
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Adding: rd (not a dir): " + f);
+            }
+            result.add(fileStatus);
+        }
+
+        return result.toArray(new FileStatus[result.size()]);
+    }
+
+    public void setWorkingDirectory(Path newDir) {
+        workingDir = newDir;
+    }
+
+    /**
+     * Get the current working directory for the given file system
+     *
+     * @return the directory pathname
+     */
+    public Path getWorkingDirectory() {
+        return workingDir;
+    }
+
+    /**
+     * Make the given file and all non-existent parents into
+     * directories. Has the semantics of Unix 'mkdir -p'.
+     * Existence of the directory hierarchy is not an error.
+     *
+     * @param f          path to create
+     * @param permission to apply to f
+     */
+    // TODO: If we have created an empty file at /foo/bar and we then call
+    // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
+    @Override
+    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Making directory: " + f);
+        }
+        try {
+            FileStatus fileStatus = getFileStatus(f);
+
+            if (fileStatus.isDirectory()) {
+                return true;
+            } else {
+                throw new FileAlreadyExistsException("Path is a file: " + f);
+            }
+        } catch (FileNotFoundException e) {
+
+            Path fPart = f;
+            do {
+                try {
+                    FileStatus fileStatus = getFileStatus(fPart);
+                    if (fileStatus.isFile()) {
+                        throw new FileAlreadyExistsException(String.format(
+                                "Can't make directory for path '%s' since it is a file.",
+                                fPart));
+                    }
+                } catch (FileNotFoundException fileNotFoundException) {
+                    LOG.error(fileNotFoundException.getMessage());
+                }
+                fPart = fPart.getParent();
+            } while (fPart != null);
+
+            String key = pathToKey(f);
+            createEmptyDirectory(bucket, key);
+            return true;
+        }
+    }
+
+    /**
+     * Return a file status object that represents the path.
+     *
+     * @param f The path we want information from
+     * @return a FileStatus object
+     * @throws FileNotFoundException when the path does not exist;
+     *                               IOException see specific implementation
+     */
+    public OSSFileStatus getFileStatus(Path f) throws IOException {
+        String key = pathToKey(f);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Getting path status for " + f + " (" + key + ")");
+        }
+
+        //try to get object firstly
+        if (!key.isEmpty()) {
+            try {
+                ObjectMetadata fileMetadata = client.getObjectMetadata(bucket, key);
+                statistics.incrementReadOps(1);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Found exact file: normal file");
+                }
+                return new OSSFileStatus(fileMetadata.getContentLength(),
+                        dateToLong(fileMetadata.getLastModified()),
+                        f.makeQualified(uri, workingDir),
+                        getDefaultBlockSize(f.makeQualified(uri, workingDir)));
+            } catch (OSSException e) {
+                if (!e.getErrorCode().equals(OSSErrorCode.NO_SUCH_KEY)) {
+                    LOG.error(e.getMessage());
+                    throw e;
+                }
+            } catch (ClientException e) {
+                LOG.error(e.getMessage());
+                throw e;
+            }
+        }
+
+        //try to append '/' and get empty directory
+        if (!key.isEmpty() && !key.endsWith("/")) {
+            try {
+                String newKey = key + "/";
+                ObjectMetadata meta = client.getObjectMetadata(bucket, newKey);
+                statistics.incrementReadOps(1);
+
+                if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Found file (with /): fake directory");
+                    }
+                    return new OSSFileStatus(true, true, f.makeQualified(uri, workingDir));
+                }
+            } catch (OSSException e) {
+                if (!e.getErrorCode().equals(OSSErrorCode.NO_SUCH_KEY)) {
+                    LOG.error(e.getMessage());
+                    throw e;
+                }
+            } catch (ClientException e) {
+                LOG.error(e.getMessage());
+                throw e;
+            }
+        }
+
+        //try to append '/' and get non-empty directory
+        try {
+            if (!key.isEmpty() && !key.endsWith("/")) {
+                key = key + "/";
+            }
+            ListObjectsRequest request = new ListObjectsRequest();
+            request.setBucketName(bucket);
+            request.setPrefix(key);
+            request.setDelimiter("/");
+            request.setMaxKeys(1);
+
+            ObjectListing objects = client.listObjects(request);
+            statistics.incrementReadOps(1);
+
+            if (!objects.getCommonPrefixes().isEmpty()
+                    || objects.getObjectSummaries().size() > 0) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Found path as directory (with /): " +
+                            objects.getCommonPrefixes().size() + "/" +
+                            objects.getObjectSummaries().size());
+
+                    for (OSSObjectSummary summary : objects.getObjectSummaries()) {
+                        LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize());
+                    }
+                    for (String prefix : objects.getCommonPrefixes()) {
+                        LOG.debug("Prefix: " + prefix);
+                    }
+                }
+                return new OSSFileStatus(true, false, f.makeQualified(uri, workingDir));
+            } else if (key.isEmpty()) {
+                LOG.debug("Found root directory");
+                return new OSSFileStatus(true, true, f.makeQualified(uri, workingDir));
+            }
+        } catch (OSSException e) {
+            if (!e.getErrorCode().equals(OSSErrorCode.NO_SUCH_KEY)) {
+                LOG.error(e.getMessage());
+                throw e;
+            }
+        } catch (ClientException e) {
+            LOG.error(e.getMessage());
+            throw e;
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Not Found: " + f);
+        }
+        throw new FileNotFoundException("No such file or directory: " + f);
+
+    }
+
+    /**
+     * The src file is on the local disk.  Add it to FS at
+     * the given dst name.
+     * <p/>
+     * This version doesn't need to create a temporary file to calculate the md5.
+     * Sadly this doesn't seem to be used by the shell cp :(
+     * <p/>
+     * delSrc indicates if the source should be removed
+     *
+     * @param delSrc    whether to delete the src
+     * @param overwrite whether to overwrite an existing file
+     * @param src       path
+     * @param dst       path
+     */
+    @Override
+    public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
+                                  Path dst) throws IOException {
+        String key = pathToKey(dst);
+
+        if (!overwrite && exists(dst)) {
+            throw new IOException(dst + " already exists");
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Copying local file from " + src + " to " + dst);
+        }
+
+        // Since we have a local file, we don't need to stream into a temporary file
+        LocalFileSystem local = getLocal(getConf());
+        File srcfile = local.pathToFile(src);
+
+        final ObjectMetadata om = new ObjectMetadata();
+        PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile);
+        putObjectRequest.setMetadata(om);
+        putObjectRequest.setProgressListener(new ProgressListener() {
+            public void progressChanged(ProgressEvent progressEvent) {
+                switch (progressEvent.getEventType()) {
+                    case TRANSFER_PART_COMPLETED_EVENT:
+                        statistics.incrementWriteOps(1);
+                        break;
+                    default:
+                        break;
+                }
+            }
+        });
+
+        try {
+            client.putObject(putObjectRequest);
+            statistics.incrementWriteOps(1);
+        } catch (OSSException | ClientException e) {
+            throw new IOException("Got interrupted, cancelling");
+        }
+        // This will delete unnecessary fake parent directories
+        finishedWrite(key);
+        if (delSrc) {
+            local.delete(src, false);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            super.close();
+        } finally {
+            // Shutdown the instance to release any allocated resources
+            if (client != null) {
+                client.shutdown();
+                client = null;
+            }
+        }
+    }
+
+    /**
+     * Override getCanonicalServiceName because we don't support token in OSS
+     */
+    @Override
+    public String getCanonicalServiceName() {
+        // Does not support Token
+        return null;
+    }
+
+    private void copyFile(String srcKey, String dstKey) throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("copyFile " + srcKey + " -> " + dstKey);
+        }
+
+        ObjectMetadata om = client.getObjectMetadata(bucket, srcKey);
+        CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
+        copyObjectRequest.setNewObjectMetadata(om);
+        copyObjectRequest.setProgressListener(new ProgressListener() {
+            public void progressChanged(ProgressEvent progressEvent) {
+                switch (progressEvent.getEventType()) {
+                    case TRANSFER_PART_COMPLETED_EVENT:
+                        statistics.incrementWriteOps(1);
+                        break;
+                    default:
+                        break;
+                }
+            }
+        });
+        try {
+            client.copyObject(copyObjectRequest);
+            statistics.incrementWriteOps(1);
+        } catch (OSSException | ClientException e) {
+            throw new IOException("Got interrupted, cancelling");
+        }
+    }
+
+    public void finishedWrite(String key) throws IOException {
+        deleteUnnecessaryEmptyDirectories(keyToPath(key).getParent());
+    }
+
+    private void deleteUnnecessaryEmptyDirectories(Path f) throws IOException {
+        while (true) {
+            try {
+                String key = pathToKey(f);
+                if (key.isEmpty()) {
+                    break;
+                }
+
+                OSSFileStatus status = getFileStatus(f);
+
+                if (status.isDirectory() && status.isEmptyDirectory()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Deleting fake directory " + key + "/");
+                    }
+                    client.deleteObject(bucket, key + "/");
+                    statistics.incrementWriteOps(1);
+                }
+            } catch (FileNotFoundException | OSSException e) {
+                throw new IOException("Got interrupted, cancelling");
+            }
+
+            if (f.isRoot()) {
+                break;
+            }
+
+            f = f.getParent();
+        }
+    }
+
+    private void createEmptyDirectory(final String bucketName, final String objectName)
+            throws ClientException, OSSException {
+        if (!objectName.endsWith("/")) {
+            createEmptyObject(bucketName, objectName + "/");
+        } else {
+            createEmptyObject(bucketName, objectName);
+        }
+    }
+
+    private void createEmptyObject(final String bucketName, final String objectName)
+            throws ClientException, OSSException {
+
+        final InputStream nullStream = new InputStream() {
+            @Override
+            public int read() throws IOException {
+                return -1;
+            }
+        };
+
+        final ObjectMetadata om = new ObjectMetadata();
+        om.setContentLength(0L);
+
+        PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, nullStream, om);
+        client.putObject(putObjectRequest);
+        statistics.incrementWriteOps(1);
+    }
+
+    /**
+     * Return the number of bytes that large input files should be optimally
+     * be split into to minimize i/o time.
+     *
+     * @deprecated use {@link #getDefaultBlockSize(Path)} instead
+     */
+    @Deprecated
+    public long getDefaultBlockSize() {

Review Comment:
   deprecated method should be replace



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #2467: [Feature][Connector-V2] Add oss source connector

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467#issuecomment-1220383449

   can you add testcase into flink、spark e2e module?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #2467: [Feature][Connector-V2] Add oss source connector

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467#discussion_r951429249


##########
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/fs/OSSFileSystem.java:
##########
@@ -0,0 +1,997 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.oss.fs;
+
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_USER_AGENT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DELETE_OBJECTS_ONETIME_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_DOMAIN;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_HOST;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PORT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_WORKSTATION;

Review Comment:
   Hi,Where does this file come from? I found what seems to be the exact same file, but it's not clear if it's the original source



##########
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/fs/OSSFileSystem.java:
##########
@@ -0,0 +1,997 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.oss.fs;
+
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_USER_AGENT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DELETE_OBJECTS_ONETIME_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_DOMAIN;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_HOST;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PORT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_WORKSTATION;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.MAX_RETURNED_KEYS_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.OSS_DEFAULT_BLOCK_SIZE;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.OSSErrorCode;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.comm.Protocol;
+import com.aliyun.oss.event.ProgressEvent;
+import com.aliyun.oss.event.ProgressListener;
+import com.aliyun.oss.model.CopyObjectRequest;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.PutObjectRequest;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+
+/**
+ * Hadoop File System implementation for Aliyun OSS.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class OSSFileSystem extends FileSystem {
+
+    private URI uri;
+    private Path workingDir;
+    private String bucket;
+    private OSSClient client;
+
+    public static final Logger LOG = LoggerFactory.getLogger(OSSFileSystem.class);
+
+    /**
+     * Called after a new FileSystem instance is constructed.
+     *
+     * @param name URI
+     * @param conf configuration
+     * @throws IOException IOException
+     */
+    public void initialize(URI name, Configuration conf) throws IOException {
+        super.initialize(name, conf);
+
+        uri = URI.create(name.getScheme() + "://" + name.getAuthority());
+        workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri,
+                this.getWorkingDirectory());
+
+        // Try to get our credentials or just connect anonymously
+        String accessKeyId = conf.get(SmartOSSClientConfig.HADOOP_ACCESS_KEY, null);
+        String accessKeySecret = conf.get(SmartOSSClientConfig.HADOOP_SECRET_KEY, null);
+        String endpoint = conf.get(SmartOSSClientConfig.HADOOP_ENDPOINT, null);
+
+        bucket = name.getHost();
+
+        // Initialize OSS Client, please refer to help.aliyun.com/document_detail/oss/sdk/java-sdk/init.html
+        // for the detailed information.
+        SmartOSSClientConfig ossConf = new SmartOSSClientConfig();
+        //user agent
+        ossConf.setUserAgent(conf.getTrimmed(HADOOP_PROXY_HOST, DEFAULT_USER_AGENT));
+        //connect to oss through a proxy server
+        String proxyHost = conf.getTrimmed(HADOOP_PROXY_HOST, "");
+        int proxyPort = conf.getInt(HADOOP_PROXY_PORT, -1);
+        String proxyUsername = conf.getTrimmed(HADOOP_PROXY_USERNAME);
+        String proxyPassword = conf.getTrimmed(HADOOP_PROXY_PASSWORD);
+        if (!proxyHost.isEmpty() && proxyPort >= 0) {
+            ossConf.setProxyHost(proxyHost);
+            ossConf.setProxyPort(proxyPort);
+        }
+        if (proxyUsername != null) {
+            ossConf.setProxyUsername(proxyUsername);
+        }
+        if (proxyPassword != null) {
+            ossConf.setProxyPassword(proxyPassword);
+        }
+        ossConf.setProxyDomain(conf.getTrimmed(HADOOP_PROXY_DOMAIN));
+        ossConf.setProxyWorkstation(conf.getTrimmed(HADOOP_PROXY_WORKSTATION));
+        //MaxConnections
+        ossConf.setMaxConnections(conf.getInt(HADOOP_MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS));
+        //SocketTimeout
+        ossConf.setSocketTimeout(conf.getInt(HADOOP_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT));
+        //ConnectionTimeout
+        ossConf.setConnectionTimeout(conf.getInt(HADOOP_ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT));
+        //MaxErrorRetry
+        ossConf.setMaxErrorRetry(conf.getInt(HADOOP_MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES));
+        //Protocol
+        boolean secureConnections = conf.getBoolean(HADOOP_SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);
+        ossConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
+        //SupportCname
+        ossConf.setSupportCname(false);  //ListBuckets can not be used when enable CNAME.
+        ossConf.setSLDEnabled(false);
+
+        //extra configuration for multiple part copy/upload
+        ossConf.setMultipartUploadThreshold(conf.getLong(HADOOP_MULTIPART_UPLOAD_THRESHOLD, DEFAULT_MULTIPART_UPLOAD_THRESHOLD));
+        ossConf.setMinimumUploadPartSize(conf.getLong(HADOOP_MULTIPART_UPLOAD_PART_SIZE, DEFAULT_MINIMUM_UPLOAD_PART_SIZE));
+        ossConf.setMultipartCopyThreshold(conf.getLong(HADOOP_MULTIPART_COPY_THRESHOLD, DEFAULT_MULTIPART_COPY_THRESHOLD));
+        ossConf.setMultipartCopyPartSize(conf.getLong(HADOOP_MULTIPART_COPY_PART_SIZE, DEFAULT_MINIMUM_COPY_PART_SIZE));
+        // extra configuration for multiple part copy/upload thread pool
+        ossConf.setCorePoolSize(conf.getInt(HADOOP_CORE_POOL_SIZE, DEFAULT_CORE_POOL_SIZE));
+        ossConf.setMaxPoolSize(conf.getInt(HADOOP_MAX_POOL_SIZE, DEFAULT_MAX_POOL_SIZE));
+        ossConf.setKeepAliveTime(conf.getInt(HADOOP_KEEP_ALIVE_TIME, DEFAULT_KEEP_ALIVE_TIME));
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
+                            "domain {} as workstation {}", ossConf.getProxyHost(),
+                    ossConf.getProxyPort(), ossConf.getProxyUsername(),
+                    ossConf.getProxyPassword(), ossConf.getProxyDomain(),
+                    ossConf.getProxyWorkstation());
+        }
+
+        client = new SmartOSSClient(endpoint, accessKeyId, accessKeySecret, ossConf);
+        if (!client.doesBucketExist(bucket)) {
+            throw new IOException("Bucket " + bucket + " does not exist");
+        }
+        setConf(conf);
+
+    }
+
+    /**
+     * Return the protocol scheme for the FileSystem.
+     *
+     * @return "oss"
+     */
+    public String getScheme() {
+        return "oss";
+    }

Review Comment:
   this file has a lot of Missing '@Override' annotation on some method,please check again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2467: [Feature][Connector-V2] Add oss source connector

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467#discussion_r952171130


##########
pom.xml:
##########
@@ -534,6 +535,12 @@
                 <version>${flink.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-aliyun</artifactId>
+                <version>${hadoop-aliyun.version}</version>
+            </dependency>

Review Comment:
   It's require Hadoop version 2.9.x+



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2467: [Feature][Connector-V2] Add oss source connector

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467#discussion_r949825291


##########
seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/fs/OSSFileSystem.java:
##########
@@ -0,0 +1,997 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.oss.fs;
+
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MINIMUM_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DEFAULT_USER_AGENT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.DELETE_OBJECTS_ONETIME_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_CORE_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_ESTABLISH_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_KEEP_ALIVE_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAXIMUM_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_ERROR_RETRIES;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MAX_POOL_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_COPY_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_PART_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_MULTIPART_UPLOAD_THRESHOLD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_DOMAIN;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_HOST;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_PORT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_PROXY_WORKSTATION;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_SECURE_CONNECTIONS;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.HADOOP_SOCKET_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.MAX_RETURNED_KEYS_LIMIT;
+import static org.apache.seatunnel.connectors.seatunnel.file.oss.fs.SmartOSSClientConfig.OSS_DEFAULT_BLOCK_SIZE;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.OSSErrorCode;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.comm.Protocol;
+import com.aliyun.oss.event.ProgressEvent;
+import com.aliyun.oss.event.ProgressListener;
+import com.aliyun.oss.model.CopyObjectRequest;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.PutObjectRequest;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+
+/**
+ * Hadoop File System implementation for Aliyun OSS.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class OSSFileSystem extends FileSystem {
+
+    private URI uri;
+    private Path workingDir;
+    private String bucket;
+    private OSSClient client;
+
+    public static final Logger LOG = LoggerFactory.getLogger(OSSFileSystem.class);
+
+    /**
+     * Called after a new FileSystem instance is constructed.
+     *
+     * @param name URI
+     * @param conf configuration
+     * @throws IOException IOException
+     */
+    public void initialize(URI name, Configuration conf) throws IOException {

Review Comment:
   Beacuse I see the implementation of hadoop not add the @NonNull annotation, do we should add this on it? This method is called by the FileSystem itself so I think it's useless. What do you think about it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2467: [Feature][Connector-V2] Add oss source connector

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467#discussion_r952115789


##########
pom.xml:
##########
@@ -534,6 +535,12 @@
                 <version>${flink.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-aliyun</artifactId>
+                <version>${hadoop-aliyun.version}</version>
+            </dependency>

Review Comment:
   It has hadoop-common dependency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #2467: [Feature][Connector-V2] Add oss source connector

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #2467:
URL: https://github.com/apache/incubator-seatunnel/pull/2467#discussion_r952112537


##########
pom.xml:
##########
@@ -534,6 +535,12 @@
                 <version>${flink.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-aliyun</artifactId>
+                <version>${hadoop-aliyun.version}</version>
+            </dependency>

Review Comment:
   Does this dependency have version requirements for hadoop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org