You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/09/23 10:01:57 UTC

[incubator-seatunnel] branch dev updated: [Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 61720306e [Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845)
61720306e is described below

commit 61720306e7d24136fa7a688ff563b242fa36b593
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Sep 23 18:01:50 2022 +0800

    [Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845)
    
    * Fix Orc Read in Flink
    
    * fix license header
    
    * exclusion orc-core from flink-orc
    
    exclusion orc-core from seatunnel-core-base
    
    * only connector-file/pom.xml need add shade plugin and connector.name properties
---
 .../pom.xml                                        | 24 ++++--
 .../file/hdfs/sink/BaseHdfsFileSink.java}          | 12 +--
 .../file/hdfs/source/BaseHdfsFileSource.java}      | 16 +---
 .../file/hdfs/source/config/HdfsSourceConfig.java  |  0
 .../connector-file/connector-file-base/pom.xml     | 31 +++----
 .../file/sink/writer/OrcWriteStrategy.java         | 24 +++---
 .../file/source/reader/OrcReadStrategy.java        | 98 +++++++++++-----------
 .../connector-file/connector-file-ftp/pom.xml      |  4 -
 .../connector-file/connector-file-hadoop/pom.xml   | 10 +--
 .../seatunnel/file/hdfs/sink/HdfsFileSink.java     | 14 +---
 .../seatunnel/file/hdfs/source/HdfsFileSource.java | 40 +--------
 .../connector-file/connector-file-local/pom.xml    |  4 -
 .../connector-file/connector-file-oss/pom.xml      |  1 -
 seatunnel-connectors-v2/connector-file/pom.xml     | 10 ++-
 seatunnel-connectors-v2/connector-hive/pom.xml     | 55 +++++++++++-
 .../connectors/seatunnel/hive/sink/HiveSink.java   | 12 +--
 .../seatunnel/hive/source/HiveSource.java          |  6 +-
 17 files changed, 174 insertions(+), 187 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/pom.xml
similarity index 71%
copy from seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
copy to seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/pom.xml
index 93f000fcb..e8664068f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/pom.xml
@@ -27,11 +27,7 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>connector-file-local</artifactId>
-
-    <properties>
-        <connector.name>file.local</connector.name>
-    </properties>
+    <artifactId>connector-file-base-hadoop</artifactId>
 
     <dependencies>
         <dependency>
@@ -42,7 +38,25 @@
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-hadoop-2</artifactId>
+            <scope>provided</scope>
         </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+            <!-- make sure that flatten runs after maven-shade-plugin -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>flatten-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
similarity index 83%
copy from seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
index a484a6345..e8cbad443 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
@@ -20,25 +20,15 @@ package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import com.google.auto.service.AutoService;
-
-@AutoService(SeaTunnelSink.class)
-public class HdfsFileSink extends BaseFileSink {
-
-    @Override
-    public String getPluginName() {
-        return FileSystemType.HDFS.getFileSystemPluginName();
-    }
+public abstract class BaseHdfsFileSink extends BaseFileSink {
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
similarity index 86%
copy from seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 45298c8b0..329ac60f6 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -18,12 +18,10 @@
 package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
 import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
@@ -32,17 +30,9 @@ import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import com.google.auto.service.AutoService;
-
 import java.io.IOException;
 
-@AutoService(SeaTunnelSource.class)
-public class HdfsFileSource extends BaseFileSource {
-
-    @Override
-    public String getPluginName() {
-        return FileSystemType.HDFS.getFileSystemPluginName();
-    }
+public abstract class BaseHdfsFileSource extends BaseFileSource {
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
@@ -62,8 +52,8 @@ public class HdfsFileSource extends BaseFileSource {
         if (pluginConfig.hasPath(HdfsSourceConfig.SCHEMA)) {
             Config schemaConfig = pluginConfig.getConfig(HdfsSourceConfig.SCHEMA);
             rowType = SeaTunnelSchema
-                    .buildWithConfig(schemaConfig)
-                    .getSeaTunnelRowType();
+                .buildWithConfig(schemaConfig)
+                .getSeaTunnelRowType();
             readStrategy.setSeaTunnelRowTypeInfo(rowType);
         } else {
             try {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/config/HdfsSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/config/HdfsSourceConfig.java
similarity index 100%
rename from seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/config/HdfsSourceConfig.java
rename to seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/config/HdfsSourceConfig.java
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index f2e425e6e..bc6d7b301 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -28,7 +28,7 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>connector-file-base</artifactId>
-    
+
     <properties>
         <commons-net.version>3.6</commons-net.version>
         <orc.version>1.5.6</orc.version>
@@ -37,7 +37,7 @@
         <flink.hadoop.version>2.7.5-7.0</flink.hadoop.version>
         <parquet-avro.version>1.12.3</parquet-avro.version>
     </properties>
-    
+
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -67,6 +67,12 @@
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-core-base</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+            </exclusions>
             <scope>test</scope>
         </dependency>
 
@@ -92,11 +98,16 @@
             <groupId>org.apache.orc</groupId>
             <artifactId>orc-core</artifactId>
             <version>${orc.version}</version>
+            <classifier>nohive</classifier>
             <exclusions>
                 <exclusion>
                     <artifactId>hadoop-common</artifactId>
                     <groupId>org.apache.hadoop</groupId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>hadoop-hdfs</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
 
@@ -116,7 +127,6 @@
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-hadoop-2</artifactId>
         </dependency>
-
     </dependencies>
 
     <build>
@@ -124,18 +134,9 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <!-- base module need skip shading -->
-                        <configuration>
-                            <skip>true</skip>
-                        </configuration>
-                    </execution>
-                </executions>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
             </plugin>
             <!-- make sure that flatten runs after maven-shade-plugin -->
             <plugin>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 875504e9b..0b9ed40aa 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -24,15 +24,15 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkCo
 
 import lombok.NonNull;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
 import java.io.IOException;
 import java.math.BigInteger;
@@ -91,12 +91,12 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
             Path path = new Path(filePath);
             try {
                 OrcFile.WriterOptions options = OrcFile.writerOptions(getConfiguration(hadoopConf))
-                        .setSchema(schema)
-                        // temporarily used snappy
-                        .compress(CompressionKind.SNAPPY)
-                        // use orc version 0.12
-                        .version(OrcFile.Version.V_0_12)
-                        .overwrite(true);
+                    .setSchema(schema)
+                    // temporarily used snappy
+                    .compress(CompressionKind.SNAPPY)
+                    // use orc version 0.12
+                    .version(OrcFile.Version.V_0_12)
+                    .overwrite(true);
                 Writer newWriter = OrcFile.createWriter(path, options);
                 this.beingWrittenWriter.put(filePath, newWriter);
                 return newWriter;
@@ -170,7 +170,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
         if (value instanceof Boolean) {
             Boolean bool = (Boolean) value;
             longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? Long.valueOf(1) : Long.valueOf(0);
-        }  else if (value instanceof Integer) {
+        } else if (value instanceof Integer) {
             longVector.vector[row] = ((Integer) value).longValue();
         } else if (value instanceof Long) {
             longVector.vector[row] = (Long) value;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 3ebffb5a1..5b946b769 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -36,22 +36,22 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.io.Text;
 import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -381,15 +381,15 @@ public class OrcReadStrategy extends AbstractReadStrategy {
         ColumnVector.Type keyType = mapVector.keys.type;
         ColumnVector.Type valueType = mapVector.values.type;
         return
-                keyType == ColumnVector.Type.BYTES ||
-                        keyType == ColumnVector.Type.LONG ||
-                        keyType == ColumnVector.Type.DOUBLE
-                        &&
-                        valueType == ColumnVector.Type.LONG ||
-                        valueType == ColumnVector.Type.DOUBLE ||
-                        valueType == ColumnVector.Type.BYTES ||
-                        valueType == ColumnVector.Type.DECIMAL ||
-                        valueType == ColumnVector.Type.TIMESTAMP;
+            keyType == ColumnVector.Type.BYTES ||
+                keyType == ColumnVector.Type.LONG ||
+                keyType == ColumnVector.Type.DOUBLE
+                    &&
+                    valueType == ColumnVector.Type.LONG ||
+                valueType == ColumnVector.Type.DOUBLE ||
+                valueType == ColumnVector.Type.BYTES ||
+                valueType == ColumnVector.Type.DECIMAL ||
+                valueType == ColumnVector.Type.TIMESTAMP;
     }
 
     private Object[] readMapVector(ColumnVector mapVector, TypeDescription childType, int offset, int numValues) {
@@ -397,47 +397,47 @@ public class OrcReadStrategy extends AbstractReadStrategy {
         switch (mapVector.type) {
             case BYTES:
                 mapList =
-                        readBytesListVector(
-                                (BytesColumnVector) mapVector,
-                                childType,
-                                offset,
-                                numValues
-                        );
+                    readBytesListVector(
+                        (BytesColumnVector) mapVector,
+                        childType,
+                        offset,
+                        numValues
+                    );
                 break;
             case LONG:
                 mapList =
-                        readLongListVector(
-                                (LongColumnVector) mapVector,
-                                childType,
-                                offset,
-                                numValues
-                        );
+                    readLongListVector(
+                        (LongColumnVector) mapVector,
+                        childType,
+                        offset,
+                        numValues
+                    );
                 break;
             case DOUBLE:
                 mapList =
-                        readDoubleListVector(
-                                (DoubleColumnVector) mapVector,
-                                childType,
-                                offset,
-                                numValues
-                        );
+                    readDoubleListVector(
+                        (DoubleColumnVector) mapVector,
+                        childType,
+                        offset,
+                        numValues
+                    );
                 break;
             case DECIMAL:
                 mapList =
-                        readDecimalListVector(
-                                (DecimalColumnVector) mapVector,
-                                offset,
-                                numValues
-                        );
+                    readDecimalListVector(
+                        (DecimalColumnVector) mapVector,
+                        offset,
+                        numValues
+                    );
                 break;
             case TIMESTAMP:
                 mapList =
-                        readTimestampListVector(
-                                (TimestampColumnVector) mapVector,
-                                childType,
-                                offset,
-                                numValues
-                        );
+                    readTimestampListVector(
+                        (TimestampColumnVector) mapVector,
+                        childType,
+                        offset,
+                        numValues
+                    );
                 break;
             default:
                 throw new UnsupportedOperationException(mapVector.type.name() + " is not supported for MapColumnVectors");
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
index 94bd6603a..6241a815c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
@@ -29,10 +29,6 @@
     <modelVersion>4.0.0</modelVersion>
     <artifactId>connector-file-ftp</artifactId>
 
-    <properties>
-        <connector.name>file.ftp</connector.name>
-    </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
index dc59fe9fe..db9d29cac 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
@@ -29,20 +29,12 @@
 
     <artifactId>connector-file-hadoop</artifactId>
 
-    <properties>
-        <connector.name>file.hadoop</connector.name>
-    </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-file-base</artifactId>
+            <artifactId>connector-file-base-hadoop</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-shaded-hadoop-2</artifactId>
-        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
index a484a6345..2e619819f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
@@ -17,23 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
-
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
 
 @AutoService(SeaTunnelSink.class)
-public class HdfsFileSink extends BaseFileSink {
+public class HdfsFileSink extends BaseHdfsFileSink {
 
     @Override
     public String getPluginName() {
@@ -42,11 +35,6 @@ public class HdfsFileSink extends BaseFileSink {
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, FS_DEFAULT_NAME_KEY);
-        if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
-        }
         super.prepare(pluginConfig);
-        hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
index 45298c8b0..943a189e7 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
@@ -19,25 +19,14 @@ package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
-import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
-import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
 
-import java.io.IOException;
-
 @AutoService(SeaTunnelSource.class)
-public class HdfsFileSource extends BaseFileSource {
+public class HdfsFileSource extends BaseHdfsFileSource {
 
     @Override
     public String getPluginName() {
@@ -46,31 +35,6 @@ public class HdfsFileSource extends BaseFileSource {
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HdfsSourceConfig.FILE_PATH, HdfsSourceConfig.FILE_TYPE, HdfsSourceConfig.DEFAULT_FS);
-        if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
-        }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE));
-        String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH);
-        hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS));
-        try {
-            filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
-        } catch (IOException e) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
-        }
-        // support user-defined schema
-        if (pluginConfig.hasPath(HdfsSourceConfig.SCHEMA)) {
-            Config schemaConfig = pluginConfig.getConfig(HdfsSourceConfig.SCHEMA);
-            rowType = SeaTunnelSchema
-                    .buildWithConfig(schemaConfig)
-                    .getSeaTunnelRowType();
-            readStrategy.setSeaTunnelRowTypeInfo(rowType);
-        } else {
-            try {
-                rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
-            } catch (FilePluginException e) {
-                throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
-            }
-        }
+        super.prepare(pluginConfig);
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
index 93f000fcb..8632a26cc 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
@@ -29,10 +29,6 @@
 
     <artifactId>connector-file-local</artifactId>
 
-    <properties>
-        <connector.name>file.local</connector.name>
-    </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml
index 4e2ca4d83..0a0ae910f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml
@@ -30,7 +30,6 @@
     <artifactId>connector-file-oss</artifactId>
     <properties>
         <hadoop-aliyun.version>2.9.2</hadoop-aliyun.version>
-        <connector.name>file.oss</connector.name>
     </properties>
 
     <dependencies>
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml
index f71d739f9..f71bf8266 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/pom.xml
@@ -29,12 +29,17 @@
     <artifactId>connector-file</artifactId>
     <packaging>pom</packaging>
 
+    <properties>
+        <connector.name>connector.file</connector.name>
+    </properties>
+
     <modules>
         <module>connector-file-base</module>
         <module>connector-file-hadoop</module>
         <module>connector-file-local</module>
         <module>connector-file-oss</module>
         <module>connector-file-ftp</module>
+        <module>connector-file-base-hadoop</module>
     </modules>
 
     <build>
@@ -55,6 +60,10 @@
                                     <!--suppress UnresolvedMavenProperty, this property is added by submodule-->
                                     <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.avro</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>org.apache.orc</pattern>
+                                    <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.orc</shadedPattern>
+                                </relocation>
                                 <relocation>
                                     <pattern>org.apache.parquet</pattern>
                                     <!--suppress UnresolvedMavenProperty -->
@@ -70,7 +79,6 @@
                     </execution>
                 </executions>
             </plugin>
-
             <!-- make sure that flatten runs after maven-shade-plugin -->
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
diff --git a/seatunnel-connectors-v2/connector-hive/pom.xml b/seatunnel-connectors-v2/connector-hive/pom.xml
index 39c6f6429..e9f28c41a 100644
--- a/seatunnel-connectors-v2/connector-hive/pom.xml
+++ b/seatunnel-connectors-v2/connector-hive/pom.xml
@@ -31,12 +31,13 @@
     
     <properties>
         <hive.metastore.version>2.3.9</hive.metastore.version>
+        <connector.name>connector.hive</connector.name>
     </properties>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-file-hadoop</artifactId>
+            <artifactId>connector-file-base-hadoop</artifactId>
             <version>${project.version}</version>
             <exclusions>
                 <exclusion>
@@ -52,8 +53,9 @@
         </dependency>
         <dependency>
             <groupId>org.apache.hive</groupId>
-            <artifactId>hive-metastore</artifactId>
+            <artifactId>hive-exec</artifactId>
             <version>${hive.metastore.version}</version>
+            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <artifactId>log4j</artifactId>
@@ -83,8 +85,55 @@
                     <artifactId>jdk.tools</artifactId>
                     <groupId>jdk.tools</groupId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+                    <groupId>org.pentaho</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
     </dependencies>
-
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.avro</pattern>
+                                    <!--suppress UnresolvedMavenProperty, this property is added by submodule-->
+                                    <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.avro</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.orc</pattern>
+                                    <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.orc</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.parquet</pattern>
+                                    <!--suppress UnresolvedMavenProperty -->
+                                    <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.parquet</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>shaded.parquet</pattern>
+                                    <!--suppress UnresolvedMavenProperty -->
+                                    <shadedPattern>${seatunnel.shade.package}.${connector.name}.shaded.parquet</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <!-- make sure that flatten runs after maven-shade-plugin -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>flatten-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 9adae1158..c56beedcb 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -36,7 +36,7 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
 import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
@@ -57,7 +57,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 @AutoService(SeaTunnelSink.class)
-public class HiveSink extends BaseFileSink {
+public class HiveSink extends BaseHdfsFileSink {
     private String dbName;
     private String tableName;
     private Table tableInformation;
@@ -104,8 +104,8 @@ public class HiveSink extends BaseFileSink {
         if (TEXT_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
             Map<String, String> parameters = tableInformation.getSd().getSerdeInfo().getParameters();
             pluginConfig = pluginConfig.withValue(FILE_FORMAT, ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
-                    .withValue(FIELD_DELIMITER, ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
-                    .withValue(ROW_DELIMITER, ConfigValueFactory.fromAnyRef(parameters.get("line.delim")));
+                .withValue(FIELD_DELIMITER, ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
+                .withValue(ROW_DELIMITER, ConfigValueFactory.fromAnyRef(parameters.get("line.delim")));
         } else if (PARQUET_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
             pluginConfig = pluginConfig.withValue(FILE_FORMAT, ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
         } else if (ORC_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
@@ -114,8 +114,8 @@ public class HiveSink extends BaseFileSink {
             throw new RuntimeException("Only support [text parquet orc] file now");
         }
         pluginConfig = pluginConfig.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE, ConfigValueFactory.fromAnyRef(false))
-                .withValue(FILE_NAME_EXPRESSION, ConfigValueFactory.fromAnyRef("${transactionId}"))
-                .withValue(PATH, ConfigValueFactory.fromAnyRef(tableInformation.getSd().getLocation()));
+            .withValue(FILE_NAME_EXPRESSION, ConfigValueFactory.fromAnyRef("${transactionId}"))
+            .withValue(PATH, ConfigValueFactory.fromAnyRef(tableInformation.getSd().getLocation()));
 
         if (!pluginConfig.hasPath(SAVE_MODE) || StringUtils.isBlank(pluginConfig.getString(SAVE_MODE))) {
             pluginConfig = pluginConfig.withValue(SAVE_MODE, ConfigValueFactory.fromAnyRef(SaveMode.APPEND.toString()));
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index 92173892e..3789a139d 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -29,7 +29,7 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.HdfsFileSource;
+import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.BaseHdfsFileSource;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -43,7 +43,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 
 @AutoService(SeaTunnelSource.class)
-public class HiveSource extends HdfsFileSource {
+public class HiveSource extends BaseHdfsFileSource {
     private Table tableInformation;
 
     @Override
@@ -75,7 +75,7 @@ public class HiveSource extends HdfsFileSource {
             String path = uri.getPath();
             String defaultFs = hdfsLocation.replace(path, "");
             pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_PATH, ConfigValueFactory.fromAnyRef(path))
-                    .withValue(FS_DEFAULT_NAME_KEY, ConfigValueFactory.fromAnyRef(defaultFs));
+                .withValue(FS_DEFAULT_NAME_KEY, ConfigValueFactory.fromAnyRef(defaultFs));
         } catch (URISyntaxException e) {
             throw new RuntimeException("Get hdfs cluster address failed, please check.", e);
         }