You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/15 15:47:03 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add base source connector code for connector-file-base (#2399)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1829ddc66 [Feature][Connector-V2] Add base source connector code for connector-file-base (#2399)
1829ddc66 is described below

commit 1829ddc6627b1e8a6fd3ff422805ca3b287ecb67
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Mon Aug 15 23:46:58 2022 +0800

    [Feature][Connector-V2] Add base source connector code for connector-file-base (#2399)
---
 .../connector-file/connector-file-base/pom.xml     |   9 +
 .../{FileFormat.java => BaseSourceConfig.java}     |  21 +-
 .../seatunnel/file/config/FileFormat.java          |  48 +-
 .../config/{FileFormat.java => HadoopConf.java}    |  20 +-
 .../FilePluginException.java}                      |  22 +-
 .../seatunnel/file/source/BaseFileSource.java      |  65 +++
 .../file/source/BaseFileSourceReader.java          |  92 ++++
 .../file/source/reader/AbstractReadStrategy.java   |  86 ++++
 .../file/source/reader/OrcReadStrategy.java        | 542 +++++++++++++++++++++
 .../file/source/reader/ParquetReadStrategy.java    | 186 +++++++
 .../seatunnel/file/source/reader/ReadStrategy.java |  42 ++
 .../reader/ReadStrategyFactory.java}               |  28 +-
 .../file/source/reader/TextReadStrategy.java       |  56 +++
 .../split/FileSourceSplit.java}                    |  23 +-
 .../source/split/FileSourceSplitEnumerator.java    | 126 +++++
 .../state/FileSourceState.java}                    |  24 +-
 16 files changed, 1296 insertions(+), 94 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index 05e98573e..13173c14e 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -30,11 +30,13 @@
     <artifactId>connector-file-base</artifactId>
 
     <dependencies>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-format-json</artifactId>
@@ -73,5 +75,12 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-hadoop-2</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
similarity index 71%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index f9ca34409..16f1bada9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -17,22 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.config;
 
-import java.io.Serializable;
-
-public enum FileFormat implements Serializable {
-    CSV("csv"),
-    TEXT("txt"),
-    PARQUET("parquet"),
-    ORC("orc"),
-    JSON("json");
-
-    private String suffix;
-
-    private FileFormat(String suffix) {
-        this.suffix = suffix;
-    }
-
-    public String getSuffix() {
-        return "." + suffix;
-    }
+public class BaseSourceConfig {
+    public static final String FILE_TYPE = "type";
+    public static final String FILE_PATH = "path";
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index f9ca34409..790e7ca64 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -17,22 +17,56 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.config;
 
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy;
+
 import java.io.Serializable;
 
 public enum FileFormat implements Serializable {
-    CSV("csv"),
-    TEXT("txt"),
-    PARQUET("parquet"),
-    ORC("orc"),
-    JSON("json");
+    CSV("csv") {
+        @Override
+        public ReadStrategy getReadStrategy() {
+            return new TextReadStrategy();
+        }
+    },
+    TEXT("txt") {
+        @Override
+        public ReadStrategy getReadStrategy() {
+            return new TextReadStrategy();
+        }
+    },
+    PARQUET("parquet") {
+        @Override
+        public ReadStrategy getReadStrategy() {
+            return new ParquetReadStrategy();
+        }
+    },
+    ORC("orc") {
+        @Override
+        public ReadStrategy getReadStrategy() {
+            return new OrcReadStrategy();
+        }
+    },
+    JSON("json") {
+        @Override
+        public ReadStrategy getReadStrategy() {
+            return new TextReadStrategy();
+        }
+    };
 
-    private String suffix;
+    private final String suffix;
 
-    private FileFormat(String suffix) {
+    FileFormat(String suffix) {
         this.suffix = suffix;
     }
 
     public String getSuffix() {
         return "." + suffix;
     }
+
+    public ReadStrategy getReadStrategy() {
+        return null;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
similarity index 74%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
index f9ca34409..f3bbedf08 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
@@ -17,22 +17,18 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.config;
 
+import lombok.Data;
+
 import java.io.Serializable;
 
-public enum FileFormat implements Serializable {
-    CSV("csv"),
-    TEXT("txt"),
-    PARQUET("parquet"),
-    ORC("orc"),
-    JSON("json");
+@Data
+public class HadoopConf implements Serializable {
 
-    private String suffix;
+    private String hdfsNameKey;
 
-    private FileFormat(String suffix) {
-        this.suffix = suffix;
-    }
+    private String fsHdfsImpl = "org.apache.hadoop.hdfs.DistributedFileSystem";
 
-    public String getSuffix() {
-        return "." + suffix;
+    public HadoopConf(String hdfsNameKey) {
+        this.hdfsNameKey = hdfsNameKey;
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
similarity index 67%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
index f9ca34409..e747c6aea 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
@@ -15,24 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.exception;
 
-import java.io.Serializable;
-
-public enum FileFormat implements Serializable {
-    CSV("csv"),
-    TEXT("txt"),
-    PARQUET("parquet"),
-    ORC("orc"),
-    JSON("json");
-
-    private String suffix;
-
-    private FileFormat(String suffix) {
-        this.suffix = suffix;
+public class FilePluginException extends Exception {
+    public FilePluginException(String message) {
+        super(message);
     }
 
-    public String getSuffix() {
-        return "." + suffix;
+    public FilePluginException(String message, Throwable cause) {
+        super(message, cause);
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
new file mode 100644
index 000000000..02d1e024a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
+
+import java.util.List;
+
+public abstract class BaseFileSource implements SeaTunnelSource<SeaTunnelRow, FileSourceSplit, FileSourceState> {
+    protected SeaTunnelRowType rowType;
+    protected ReadStrategy readStrategy;
+    protected HadoopConf hadoopConf;
+    protected List<String> filePaths;
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return rowType;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, FileSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
+        return new BaseFileSourceReader(readStrategy, hadoopConf, readerContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<FileSourceSplit, FileSourceState> createEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext) throws Exception {
+        return new FileSourceSplitEnumerator(enumeratorContext, filePaths);
+    }
+
+    @Override
+    public SourceSplitEnumerator<FileSourceSplit, FileSourceState> restoreEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext, FileSourceState checkpointState) throws Exception {
+        return new FileSourceSplitEnumerator(enumeratorContext, filePaths, checkpointState);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
new file mode 100644
index 000000000..fa13ec66c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class BaseFileSourceReader implements SourceReader<SeaTunnelRow, FileSourceSplit> {
+    private static final long THREAD_WAIT_TIME = 500L;
+    private final ReadStrategy readStrategy;
+    private final HadoopConf hadoopConf;
+    private final SourceReader.Context context;
+    private final Set<FileSourceSplit> sourceSplits;
+
+    public BaseFileSourceReader(ReadStrategy readStrategy, HadoopConf hadoopConf, SourceReader.Context context) {
+        this.readStrategy = readStrategy;
+        this.hadoopConf = hadoopConf;
+        this.context = context;
+        this.sourceSplits = new HashSet<>();
+    }
+
+    @Override
+    public void open() throws Exception {
+        readStrategy.init(hadoopConf);
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        if (sourceSplits.isEmpty()) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        sourceSplits.forEach(source -> {
+            try {
+                readStrategy.read(source.splitId(), output);
+            } catch (Exception e) {
+                throw new RuntimeException("File source read error", e);
+            }
+        });
+        context.signalNoMoreElement();
+    }
+
+    @Override
+    public List<FileSourceSplit> snapshotState(long checkpointId) throws Exception {
+        return new ArrayList<>(sourceSplits);
+    }
+
+    @Override
+    public void addSplits(List<FileSourceSplit> splits) {
+        sourceSplits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
new file mode 100644
index 000000000..eba6d4b21
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public abstract class AbstractReadStrategy implements ReadStrategy {
+    HadoopConf hadoopConf;
+
+    @Override
+    public void init(HadoopConf conf) {
+        this.hadoopConf = conf;
+    }
+
+    @Override
+    public Configuration getConfiguration(HadoopConf hadoopConf) {
+        Configuration configuration = new Configuration();
+        if (hadoopConf != null) {
+            configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
+            configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
+        }
+        return configuration;
+    }
+
+    Configuration getConfiguration() throws FilePluginException {
+        if (null == hadoopConf) {
+            log.info("Local file reader didn't need hadoopConf");
+        }
+        return getConfiguration(hadoopConf);
+    }
+
+    boolean checkFileType(String path) {
+        return true;
+    }
+
+    @Override
+    public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException {
+        Configuration configuration = getConfiguration(hadoopConf);
+        List<String> fileNames = new ArrayList<>();
+        FileSystem hdfs = FileSystem.get(configuration);
+        Path listFiles = new Path(path);
+        FileStatus[] stats = hdfs.listStatus(listFiles);
+        for (FileStatus fileStatus : stats) {
+            if (fileStatus.isDirectory()) {
+                fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString()));
+                continue;
+            }
+            if (fileStatus.isFile()) {
+                // filter '_SUCCESS' file
+                if (!fileStatus.getPath().getName().equals("_SUCCESS")) {
+                    fileNames.add(fileStatus.getPath().toString());
+                }
+            }
+        }
+        return fileNames;
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
new file mode 100644
index 000000000..209965b63
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class OrcReadStrategy extends AbstractReadStrategy {
+
+    private SeaTunnelRowType seaTunnelRowTypeInfo;
+    private static final long MIN_SIZE = 16 * 1024;
+
+    @Override
+    public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
+        if (Boolean.FALSE.equals(checkFileType(path))) {
+            throw new Exception("Please check file type");
+        }
+        Configuration configuration = getConfiguration();
+        Path filePath = new Path(path);
+        OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(configuration);
+        try (Reader reader = OrcFile.createReader(filePath, readerOptions)) {
+            TypeDescription schema = reader.getSchema();
+            List<TypeDescription> children = schema.getChildren();
+            RecordReader rows = reader.rows();
+            VectorizedRowBatch rowBatch = reader.getSchema().createRowBatch();
+            while (rows.nextBatch(rowBatch)) {
+                int num = 0;
+                for (int i = 0; i < rowBatch.size; i++) {
+                    int numCols = rowBatch.numCols;
+                    Object[] fields = new Object[numCols];
+                    ColumnVector[] cols = rowBatch.cols;
+                    for (int j = 0; j < numCols; j++) {
+                        if (cols[j] == null) {
+                            fields[j] = null;
+                        } else {
+                            fields[j] = readColumn(cols[j], children.get(j), num);
+                        }
+                    }
+                    output.collect(new SeaTunnelRow(fields));
+                    num++;
+                }
+            }
+        }
+    }
+
+    @Override
+    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
+        if (null != seaTunnelRowTypeInfo) {
+            return seaTunnelRowTypeInfo;
+        }
+        Configuration configuration = getConfiguration(hadoopConf);
+        OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(configuration);
+        Path dstDir = new Path(path);
+        try (Reader reader = OrcFile.createReader(dstDir, readerOptions)) {
+            TypeDescription schema = reader.getSchema();
+            String[] fields = new String[schema.getFieldNames().size()];
+            SeaTunnelDataType<?>[] types = new SeaTunnelDataType[schema.getFieldNames().size()];
+            for (int i = 0; i < schema.getFieldNames().size(); i++) {
+                fields[i] = schema.getFieldNames().get(i);
+                types[i] = BasicType.STRING_TYPE;
+            }
+            seaTunnelRowTypeInfo = new SeaTunnelRowType(fields, types);
+            return seaTunnelRowTypeInfo;
+        } catch (IOException e) {
+            throw new FilePluginException("Create OrcReader Fail", e);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    boolean checkFileType(String path) {
+        try {
+            boolean checkResult;
+            Configuration configuration = getConfiguration();
+            FileSystem fileSystem = FileSystem.get(configuration);
+            Path filePath = new Path(path);
+            FSDataInputStream in = fileSystem.open(filePath);
+            // try to get Postscript in orc file
+            long size = fileSystem.getFileStatus(filePath).getLen();
+            int readSize = (int) Math.min(size, MIN_SIZE);
+            in.seek(size - readSize);
+            ByteBuffer buffer = ByteBuffer.allocate(readSize);
+            in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+            int psLen = buffer.get(readSize - 1) & 0xff;
+            int len = OrcFile.MAGIC.length();
+            if (psLen < len + 1) {
+                in.close();
+                return false;
+            }
+            int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1 - len;
+            byte[] array = buffer.array();
+            if (Text.decode(array, offset, len).equals(OrcFile.MAGIC)) {
+                checkResult = true;
+            } else {
+                // If it isn't there, this may be the 0.11.0 version of ORC.
+                // Read the first 3 bytes of the file to check for the header
+                in.seek(0);
+                byte[] header = new byte[len];
+                in.readFully(header, 0, len);
+                // if it isn't there, this isn't an ORC file
+                checkResult = Text.decode(header, 0, len).equals(OrcFile.MAGIC);
+            }
+            in.close();
+            return checkResult;
+        } catch (FilePluginException | IOException e) {
+            String errorMsg = String.format("Check orc file [%s] error", path);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
+
+    private Object readColumn(ColumnVector colVec, TypeDescription colType, int rowNum) {
+        Object columnObj = null;
+        if (!colVec.isNull[rowNum]) {
+            switch (colVec.type) {
+                case LONG:
+                    columnObj = readLongVal(colVec, colType, rowNum);
+                    break;
+                case DOUBLE:
+                    columnObj = ((DoubleColumnVector) colVec).vector[rowNum];
+                    break;
+                case BYTES:
+                    columnObj = readBytesVal(colVec, rowNum);
+                    break;
+                case DECIMAL:
+                    columnObj = readDecimalVal(colVec, rowNum);
+                    break;
+                case TIMESTAMP:
+                    columnObj = readTimestampVal(colVec, colType, rowNum);
+                    break;
+                case STRUCT:
+                    columnObj = readStructVal(colVec, colType, rowNum);
+                    break;
+                case LIST:
+                    columnObj = readListVal(colVec, colType, rowNum);
+                    break;
+                case MAP:
+                    columnObj = readMapVal(colVec, colType, rowNum);
+                    break;
+                case UNION:
+                    columnObj = readUnionVal(colVec, colType, rowNum);
+                    break;
+                default:
+                    throw new RuntimeException(
+                            "ReadColumn: unsupported ORC file column type: " + colVec.type.name()
+                    );
+            }
+        }
+        return columnObj;
+    }
+
+    private Object readLongVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
+        Object colObj = null;
+        if (!colVec.isNull[rowNum]) {
+            LongColumnVector longVec = (LongColumnVector) colVec;
+            long longVal = longVec.vector[rowNum];
+            colObj = longVal;
+            if (colType.getCategory() == TypeDescription.Category.INT) {
+                colObj = (int) longVal;
+            } else if (colType.getCategory() == TypeDescription.Category.BOOLEAN) {
+                colObj = longVal == 1 ? Boolean.TRUE : Boolean.FALSE;
+            } else if (colType.getCategory() == TypeDescription.Category.DATE) {
+                colObj = new Date(longVal);
+            }
+        }
+        return colObj;
+    }
+
+    private Object readBytesVal(ColumnVector colVec, int rowNum) {
+        Object bytesObj = null;
+        if (!colVec.isNull[rowNum]) {
+            BytesColumnVector bytesVector = (BytesColumnVector) colVec;
+            bytesObj = bytesVector.toString(rowNum);
+        }
+        return bytesObj;
+    }
+
+    private Object readDecimalVal(ColumnVector colVec, int rowNum) {
+        Object decimalObj = null;
+        if (!colVec.isNull[rowNum]) {
+            DecimalColumnVector decimalVec = (DecimalColumnVector) colVec;
+            decimalObj = decimalVec.vector[rowNum].getHiveDecimal().bigDecimalValue();
+        }
+        return decimalObj;
+    }
+
+    private Object readTimestampVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
+        Object timestampVal = null;
+        if (!colVec.isNull[rowNum]) {
+            TimestampColumnVector timestampVec = (TimestampColumnVector) colVec;
+            int nanos = timestampVec.nanos[rowNum];
+            long millis = timestampVec.time[rowNum];
+            Timestamp timestamp = new Timestamp(millis);
+            timestamp.setNanos(nanos);
+            timestampVal = timestamp;
+            if (colType.getCategory() == TypeDescription.Category.DATE) {
+                timestampVal = new Date(timestamp.getTime());
+            }
+        }
+        return timestampVal;
+    }
+
+    private Object readStructVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
+        Object structObj = null;
+        if (!colVec.isNull[rowNum]) {
+            List<Object> fieldValList = new ArrayList<>();
+            StructColumnVector structVector = (StructColumnVector) colVec;
+            ColumnVector[] fieldVec = structVector.fields;
+            List<TypeDescription> fieldTypes = colType.getChildren();
+            for (int i = 0; i < fieldVec.length; i++) {
+                Object fieldObj = readColumn(fieldVec[i], fieldTypes.get(i), rowNum);
+                fieldValList.add(fieldObj);
+            }
+            structObj = fieldValList;
+        }
+        return structObj;
+    }
+
+    private Object readMapVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
+        Map<Object, Object> objMap = new HashMap<>();
+        MapColumnVector mapVector = (MapColumnVector) colVec;
+        if (checkMapColumnVectorTypes(mapVector)) {
+            int mapSize = (int) mapVector.lengths[rowNum];
+            int offset = (int) mapVector.offsets[rowNum];
+            List<TypeDescription> mapTypes = colType.getChildren();
+            TypeDescription keyType = mapTypes.get(0);
+            TypeDescription valueType = mapTypes.get(1);
+            ColumnVector keyChild = mapVector.keys;
+            ColumnVector valueChild = mapVector.values;
+            List<Object> keyList = readMapVector(keyChild, keyType, offset, mapSize);
+            List<Object> valueList = readMapVector(valueChild, valueType, offset, mapSize);
+            for (int i = 0; i < keyList.size(); i++) {
+                objMap.put(keyList.get(i), valueList.get(i));
+            }
+        } else {
+            throw new RuntimeException("readMapVal: unsupported key or value types");
+        }
+        return objMap;
+    }
+
+    private boolean checkMapColumnVectorTypes(MapColumnVector mapVector) {
+        ColumnVector.Type keyType = mapVector.keys.type;
+        ColumnVector.Type valueType = mapVector.values.type;
+        return
+                keyType == ColumnVector.Type.BYTES ||
+                        keyType == ColumnVector.Type.LONG ||
+                        keyType == ColumnVector.Type.DOUBLE
+                        &&
+                        valueType == ColumnVector.Type.LONG ||
+                        valueType == ColumnVector.Type.DOUBLE ||
+                        valueType == ColumnVector.Type.BYTES ||
+                        valueType == ColumnVector.Type.DECIMAL ||
+                        valueType == ColumnVector.Type.TIMESTAMP;
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<Object> readMapVector(ColumnVector mapVector, TypeDescription childType, int offset, int numValues) {
+        List<Object> mapList;
+        switch (mapVector.type) {
+            case BYTES:
+                mapList =
+                        (List<Object>) readBytesListVector(
+                                (BytesColumnVector) mapVector,
+                                childType,
+                                offset,
+                                numValues
+                        );
+                break;
+            case LONG:
+                mapList =
+                        readLongListVector(
+                                (LongColumnVector) mapVector,
+                                childType,
+                                offset,
+                                numValues
+                        );
+                break;
+            case DOUBLE:
+                mapList =
+                        (List<Object>) readDoubleListVector(
+                                (DoubleColumnVector) mapVector,
+                                offset,
+                                numValues
+                        );
+                break;
+            case DECIMAL:
+                mapList =
+                        (List<Object>) readDecimalListVector(
+                                (DecimalColumnVector) mapVector,
+                                offset,
+                                numValues
+                        );
+                break;
+            case TIMESTAMP:
+                mapList =
+                        (List<Object>) readTimestampListVector(
+                                (TimestampColumnVector) mapVector,
+                                childType,
+                                offset,
+                                numValues
+                        );
+                break;
+            default:
+                throw new RuntimeException(mapVector.type.name() + " is not supported for MapColumnVectors");
+        }
+        return mapList;
+    }
+
+    private Object readUnionVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
+        Pair<TypeDescription, Object> columnValuePair;
+        UnionColumnVector unionVector = (UnionColumnVector) colVec;
+        int tagVal = unionVector.tags[rowNum];
+        List<TypeDescription> unionFieldTypes = colType.getChildren();
+        if (tagVal < unionFieldTypes.size()) {
+            TypeDescription fieldType = unionFieldTypes.get(tagVal);
+            if (tagVal < unionVector.fields.length) {
+                ColumnVector fieldVector = unionVector.fields[tagVal];
+                Object unionValue = readColumn(fieldVector, fieldType, rowNum);
+                columnValuePair = Pair.of(fieldType, unionValue);
+            } else {
+                throw new RuntimeException(
+                        "readUnionVal: union tag value out of range for union column vectors"
+                );
+            }
+        } else {
+            throw new RuntimeException(
+                    "readUnionVal: union tag value out of range for union types"
+            );
+        }
+        return columnValuePair;
+    }
+
+    private Object readListVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
+        Object listValues = null;
+        if (!colVec.isNull[rowNum]) {
+            ListColumnVector listVector = (ListColumnVector) colVec;
+            ColumnVector listChildVector = listVector.child;
+            TypeDescription childType = colType.getChildren().get(0);
+            switch (listChildVector.type) {
+                case LONG:
+                    listValues = readLongListValues(listVector, childType, rowNum);
+                    break;
+                case DOUBLE:
+                    listValues = readDoubleListValues(listVector, rowNum);
+                    break;
+                case BYTES:
+                    listValues = readBytesListValues(listVector, childType, rowNum);
+                    break;
+                case DECIMAL:
+                    listValues = readDecimalListValues(listVector, rowNum);
+                    break;
+                case TIMESTAMP:
+                    listValues = readTimestampListValues(listVector, childType, rowNum);
+                    break;
+                default:
+                    throw new RuntimeException(
+                            listVector.type.name() + " is not supported for ListColumnVectors"
+                    );
+            }
+        }
+        return listValues;
+    }
+
+    private Object readLongListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
+        int offset = (int) listVector.offsets[rowNum];
+        int numValues = (int) listVector.lengths[rowNum];
+        LongColumnVector longVector = (LongColumnVector) listVector.child;
+        return readLongListVector(longVector, childType, offset, numValues);
+    }
+
+    private List<Object> readLongListVector(LongColumnVector longVector, TypeDescription childType, int offset, int numValues) {
+        List<Object> longList = new ArrayList<>();
+        for (int i = 0; i < numValues; i++) {
+            if (!longVector.isNull[offset + i]) {
+                long longVal = longVector.vector[offset + i];
+                if (childType.getCategory() == TypeDescription.Category.BOOLEAN) {
+                    Boolean boolVal = longVal == 0 ? Boolean.valueOf(false) : Boolean.valueOf(true);
+                    longList.add(boolVal);
+                } else if (childType.getCategory() == TypeDescription.Category.INT) {
+                    Integer intObj = (int) longVal;
+                    longList.add(intObj);
+                } else {
+                    longList.add(longVal);
+                }
+            } else {
+                longList.add(null);
+            }
+        }
+        return longList;
+    }
+
+    private Object readDoubleListValues(ListColumnVector listVector, int rowNum) {
+        int offset = (int) listVector.offsets[rowNum];
+        int numValues = (int) listVector.lengths[rowNum];
+        DoubleColumnVector doubleVec = (DoubleColumnVector) listVector.child;
+        return readDoubleListVector(doubleVec, offset, numValues);
+    }
+
+    private Object readDoubleListVector(DoubleColumnVector doubleVec, int offset, int numValues) {
+        List<Object> doubleList = new ArrayList<>();
+        for (int i = 0; i < numValues; i++) {
+            if (!doubleVec.isNull[offset + i]) {
+                Double doubleVal = doubleVec.vector[offset + i];
+                doubleList.add(doubleVal);
+            } else {
+                doubleList.add(null);
+            }
+        }
+        return doubleList;
+    }
+
+    private Object readBytesListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
+        int offset = (int) listVector.offsets[rowNum];
+        int numValues = (int) listVector.lengths[rowNum];
+        BytesColumnVector bytesVec = (BytesColumnVector) listVector.child;
+        return readBytesListVector(bytesVec, childType, offset, numValues);
+    }
+
+    private Object readBytesListVector(BytesColumnVector bytesVec, TypeDescription childType, int offset, int numValues) {
+        List<Object> bytesValList = new ArrayList<>();
+        for (int i = 0; i < numValues; i++) {
+            if (!bytesVec.isNull[offset + i]) {
+                byte[] byteArray = bytesVec.vector[offset + i];
+                int vecLen = bytesVec.length[offset + i];
+                int vecStart = bytesVec.start[offset + i];
+                byte[] vecCopy = Arrays.copyOfRange(byteArray, vecStart, vecStart + vecLen);
+                if (childType.getCategory() == TypeDescription.Category.STRING) {
+                    String str = new String(vecCopy);
+                    bytesValList.add(str);
+                } else {
+                    bytesValList.add(vecCopy);
+                }
+            } else {
+                bytesValList.add(null);
+            }
+        }
+        return bytesValList;
+    }
+
+    private Object readDecimalListValues(ListColumnVector listVector, int rowNum) {
+        int offset = (int) listVector.offsets[rowNum];
+        int numValues = (int) listVector.lengths[rowNum];
+        DecimalColumnVector decimalVec = (DecimalColumnVector) listVector.child;
+        return readDecimalListVector(decimalVec, offset, numValues);
+    }
+
+    private Object readDecimalListVector(DecimalColumnVector decimalVector, int offset, int numValues) {
+        List<Object> decimalList = new ArrayList<>();
+        for (int i = 0; i < numValues; i++) {
+            if (!decimalVector.isNull[offset + i]) {
+                BigDecimal bigDecimal = decimalVector.vector[i].getHiveDecimal().bigDecimalValue();
+                decimalList.add(bigDecimal);
+            } else {
+                decimalList.add(null);
+            }
+        }
+        return decimalList;
+    }
+
+    private Object readTimestampListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
+        int offset = (int) listVector.offsets[rowNum];
+        int numValues = (int) listVector.lengths[rowNum];
+        TimestampColumnVector timestampVec = (TimestampColumnVector) listVector.child;
+        return readTimestampListVector(timestampVec, childType, offset, numValues);
+    }
+
+    private Object readTimestampListVector(TimestampColumnVector timestampVector, TypeDescription childType, int offset, int numValues) {
+        List<Object> timestampList = new ArrayList<>();
+        for (int i = 0; i < numValues; i++) {
+            if (!timestampVector.isNull[offset + i]) {
+                int nanos = timestampVector.nanos[offset + i];
+                long millis = timestampVector.time[offset + i];
+                Timestamp timestamp = new Timestamp(millis);
+                timestamp.setNanos(nanos);
+                if (childType.getCategory() == TypeDescription.Category.DATE) {
+                    Date date = new Date(timestamp.getTime());
+                    timestampList.add(date);
+                } else {
+                    timestampList.add(timestamp);
+                }
+            } else {
+                timestampList.add(null);
+            }
+        }
+        return timestampList;
+    }
+}
+
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
new file mode 100644
index 000000000..82e05beba
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class ParquetReadStrategy extends AbstractReadStrategy {
+
+    private SeaTunnelRowType seaTunnelRowType;
+
+    private static final byte[] PARQUET_MAGIC = new byte[]{(byte) 'P', (byte) 'A', (byte) 'R', (byte) '1'};
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
+        if (Boolean.FALSE.equals(checkFileType(path))) {
+            throw new Exception("please check file type");
+        }
+        Path filePath = new Path(path);
+        HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, getConfiguration());
+        int fieldsCount = seaTunnelRowType.getTotalFields();
+        GenericRecord record;
+        try (ParquetReader<GenericData.Record> reader = AvroParquetReader.<GenericData.Record>builder(hadoopInputFile).build()) {
+            while ((record = reader.read()) != null) {
+                Object[] fields = new Object[fieldsCount];
+                for (int i = 0; i < fieldsCount; i++) {
+                    Object data = record.get(i);
+                    try {
+                        if (data instanceof GenericData.Fixed) {
+                            // judge the data in upstream is or not decimal type
+                            data = fixed2String((GenericData.Fixed) data);
+                        } else if (data instanceof ArrayList) {
+                            // judge the data in upstream is or not array type
+                            data = array2String((ArrayList<GenericData.Record>) data);
+                        }
+                    } catch (Exception e) {
+                        data = record.get(i);
+                    } finally {
+                        fields[i] = data.toString();
+                    }
+                }
+                output.collect(new SeaTunnelRow(fields));
+            }
+        }
+    }
+
+    @Override
+    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
+        if (seaTunnelRowType != null) {
+            return seaTunnelRowType;
+        }
+        Configuration configuration = getConfiguration(hadoopConf);
+        Path filePath = new Path(path);
+        ParquetMetadata metadata;
+        try {
+            metadata = ParquetFileReader.readFooter(configuration, filePath);
+        } catch (IOException e) {
+            throw new FilePluginException("Create parquet reader failed", e);
+        }
+        FileMetaData fileMetaData = metadata.getFileMetaData();
+        MessageType schema = fileMetaData.getSchema();
+        int fieldCount = schema.getFieldCount();
+        String[] fields = new String[fieldCount];
+        SeaTunnelDataType<?>[] types = new SeaTunnelDataType[fieldCount];
+        for (int i = 0; i < fieldCount; i++) {
+            fields[i] = schema.getFieldName(i);
+            // Temporarily each field is treated as a string type
+            // I think we can use the schema information to build seatunnel column type
+            types[i] = BasicType.STRING_TYPE;
+        }
+        seaTunnelRowType = new SeaTunnelRowType(fields, types);
+        return seaTunnelRowType;
+    }
+
+    private String fixed2String(GenericData.Fixed fixed) {
+        Schema schema = fixed.getSchema();
+        byte[] bytes = fixed.bytes();
+        int precision = Integer.parseInt(schema.getObjectProps().get("precision").toString());
+        int scale = Integer.parseInt(schema.getObjectProps().get("scale").toString());
+        BigDecimal bigDecimal = bytes2Decimal(bytes, precision, scale);
+        return bigDecimal.toString();
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private BigDecimal bytes2Decimal(byte[] bytesArray, int precision, int scale) {
+        Binary value = Binary.fromConstantByteArray(bytesArray);
+        if (precision <= 18) {
+            ByteBuffer buffer = value.toByteBuffer();
+            byte[] bytes = buffer.array();
+            int start = buffer.arrayOffset() + buffer.position();
+            int end = buffer.arrayOffset() + buffer.limit();
+            long unscaled = 0L;
+            int i = start;
+            while (i < end) {
+                unscaled = unscaled << 8 | bytes[i] & 0xff;
+                i++;
+            }
+            int bits = 8 * (end - start);
+            long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
+            if (unscaledNew <= -Math.pow(10, 18) || unscaledNew >= Math.pow(10, 18)) {
+                return new BigDecimal(unscaledNew);
+            } else {
+                return BigDecimal.valueOf(unscaledNew / Math.pow(10, scale));
+            }
+        } else {
+            return new BigDecimal(new BigInteger(value.getBytes()), scale);
+        }
+    }
+
+    @Override
+    boolean checkFileType(String path) {
+        boolean checkResult;
+        byte[] magic = new byte[PARQUET_MAGIC.length];
+        try {
+            Configuration configuration = getConfiguration();
+            FileSystem fileSystem = FileSystem.get(configuration);
+            Path filePath = new Path(path);
+            FSDataInputStream in = fileSystem.open(filePath);
+            // try to get header information in a parquet file
+            in.seek(0);
+            in.readFully(magic);
+            checkResult = Arrays.equals(magic, PARQUET_MAGIC);
+            in.close();
+            return checkResult;
+        } catch (FilePluginException | IOException e) {
+            String errorMsg = String.format("Check parquet file [%s] error", path);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
+
+    private String array2String(ArrayList<GenericData.Record> data) throws JsonProcessingException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        List<String> values = data.stream().map(record -> record.get(0).toString()).collect(Collectors.toList());
+        return objectMapper.writeValueAsString(values);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
new file mode 100644
index 000000000..1114eab65
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+public interface ReadStrategy extends Serializable {
+    void init(HadoopConf conf);
+
+    Configuration getConfiguration(HadoopConf conf);
+
+    void read(String path, Collector<SeaTunnelRow> output) throws Exception;
+
+    SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException;
+
+    List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException;
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
similarity index 53%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
index f9ca34409..f12f2c2dc 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
@@ -15,24 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
-import java.io.Serializable;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 
-public enum FileFormat implements Serializable {
-    CSV("csv"),
-    TEXT("txt"),
-    PARQUET("parquet"),
-    ORC("orc"),
-    JSON("json");
+import lombok.extern.slf4j.Slf4j;
 
-    private String suffix;
+@Slf4j
+public class ReadStrategyFactory {
 
-    private FileFormat(String suffix) {
-        this.suffix = suffix;
-    }
+    private ReadStrategyFactory() {}
 
-    public String getSuffix() {
-        return "." + suffix;
+    public static ReadStrategy of(String fileType) {
+        try {
+            FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase());
+            return fileFormat.getReadStrategy();
+        } catch (IllegalArgumentException e) {
+            String errorMsg = String.format("File source connector not support this file type [%s], please check your config", fileType);
+            throw new RuntimeException(errorMsg);
+        }
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
new file mode 100644
index 000000000..61e6ede77
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+public class TextReadStrategy extends AbstractReadStrategy {
+
+    private static final String TEXT_FIELD_NAME = "lines";
+
+    @Override
+    public void read(String path, Collector<SeaTunnelRow> output) throws IOException, FilePluginException {
+        Configuration conf = getConfiguration();
+        FileSystem fs = FileSystem.get(conf);
+        Path filePath = new Path(path);
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
+            reader.lines().forEach(line -> output.collect(new SeaTunnelRow(new String[]{line})));
+        }
+    }
+
+    @Override
+    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) {
+        return new SeaTunnelRowType(new String[]{TEXT_FIELD_NAME},
+                new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
similarity index 67%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
index f9ca34409..50161e717 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
@@ -15,24 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
 
-import java.io.Serializable;
+import org.apache.seatunnel.api.source.SourceSplit;
 
-public enum FileFormat implements Serializable {
-    CSV("csv"),
-    TEXT("txt"),
-    PARQUET("parquet"),
-    ORC("orc"),
-    JSON("json");
+public class FileSourceSplit implements SourceSplit {
+    private final String splitId;
 
-    private String suffix;
-
-    private FileFormat(String suffix) {
-        this.suffix = suffix;
+    public FileSourceSplit(String splitId) {
+        this.splitId = splitId;
     }
 
-    public String getSuffix() {
-        return "." + suffix;
+    @Override
+    public String splitId() {
+        return this.splitId;
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
new file mode 100644
index 000000000..fa3cebd28
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class FileSourceSplitEnumerator implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
+    private final Context<FileSourceSplit> context;
+    private Set<FileSourceSplit> pendingSplit;
+    private Set<FileSourceSplit> assignedSplit;
+    private final List<String> filePaths;
+
+    public FileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> context, List<String> filePaths) {
+        this.context = context;
+        this.filePaths = filePaths;
+    }
+
+    public FileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> context, List<String> filePaths,
+                                     FileSourceState sourceState) {
+        this(context, filePaths);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    @Override
+    public void open() {
+        this.assignedSplit = new HashSet<>();
+        this.pendingSplit = new HashSet<>();
+    }
+
+    @Override
+    public void run() {
+        pendingSplit = getHiveFileSplit();
+        assignSplit(context.registeredReaders());
+    }
+
+    private Set<FileSourceSplit> getHiveFileSplit() {
+        Set<FileSourceSplit> hiveSourceSplits = new HashSet<>();
+        filePaths.forEach(k -> hiveSourceSplits.add(new FileSourceSplit(k)));
+        return hiveSourceSplits;
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            pendingSplit.addAll(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    private void assignSplit(Collection<Integer> taskIDList) {
+        Map<Integer, List<FileSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE);
+        for (int taskID : taskIDList) {
+            readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
+        }
+
+        pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(), taskIDList.size()))
+                .add(s));
+        readySplit.forEach(context::assignSplit);
+        assignedSplit.addAll(pendingSplit);
+        pendingSplit.clear();
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return tp.hashCode() % numReaders;
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        if (!pendingSplit.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public FileSourceState snapshotState(long checkpointId) {
+        return new FileSourceState(assignedSplit);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/state/FileSourceState.java
similarity index 60%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/state/FileSourceState.java
index f9ca34409..582618dac 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/state/FileSourceState.java
@@ -15,24 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.source.state;
 
-import java.io.Serializable;
+import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
 
-public enum FileFormat implements Serializable {
-    CSV("csv"),
-    TEXT("txt"),
-    PARQUET("parquet"),
-    ORC("orc"),
-    JSON("json");
+import java.io.Serializable;
+import java.util.Set;
 
-    private String suffix;
+public class FileSourceState implements Serializable {
+    private static final long serialVersionUID = 9208369906513934611L;
+    private final Set<FileSourceSplit> assignedSplit;
 
-    private FileFormat(String suffix) {
-        this.suffix = suffix;
+    public FileSourceState(Set<FileSourceSplit> assignedSplit) {
+        this.assignedSplit = assignedSplit;
     }
 
-    public String getSuffix() {
-        return "." + suffix;
+    public Set<FileSourceSplit> getAssignedSplit() {
+        return assignedSplit;
     }
 }