You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/09/23 10:01:57 UTC
[incubator-seatunnel] branch dev updated: [Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 61720306e [Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845)
61720306e is described below
commit 61720306e7d24136fa7a688ff563b242fa36b593
Author: Eric <ga...@gmail.com>
AuthorDate: Fri Sep 23 18:01:50 2022 +0800
[Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845)
* Fix Orc Read in Flink
* fix license header
* exclusion orc-core from flink-orc
exclusion orc-core from seatunnel-core-base
* only connector-file/pom.xml need add shade plugin and connector.name properties
---
.../pom.xml | 24 ++++--
.../file/hdfs/sink/BaseHdfsFileSink.java} | 12 +--
.../file/hdfs/source/BaseHdfsFileSource.java} | 16 +---
.../file/hdfs/source/config/HdfsSourceConfig.java | 0
.../connector-file/connector-file-base/pom.xml | 31 +++----
.../file/sink/writer/OrcWriteStrategy.java | 24 +++---
.../file/source/reader/OrcReadStrategy.java | 98 +++++++++++-----------
.../connector-file/connector-file-ftp/pom.xml | 4 -
.../connector-file/connector-file-hadoop/pom.xml | 10 +--
.../seatunnel/file/hdfs/sink/HdfsFileSink.java | 14 +---
.../seatunnel/file/hdfs/source/HdfsFileSource.java | 40 +--------
.../connector-file/connector-file-local/pom.xml | 4 -
.../connector-file/connector-file-oss/pom.xml | 1 -
seatunnel-connectors-v2/connector-file/pom.xml | 10 ++-
seatunnel-connectors-v2/connector-hive/pom.xml | 55 +++++++++++-
.../connectors/seatunnel/hive/sink/HiveSink.java | 12 +--
.../seatunnel/hive/source/HiveSource.java | 6 +-
17 files changed, 174 insertions(+), 187 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/pom.xml
similarity index 71%
copy from seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
copy to seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/pom.xml
index 93f000fcb..e8664068f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/pom.xml
@@ -27,11 +27,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>connector-file-local</artifactId>
-
- <properties>
- <connector.name>file.local</connector.name>
- </properties>
+ <artifactId>connector-file-base-hadoop</artifactId>
<dependencies>
<dependency>
@@ -42,7 +38,25 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
+ <scope>provided</scope>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <!-- make sure that flatten runs after maven-shade-plugin -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>flatten-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
similarity index 83%
copy from seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
index a484a6345..e8cbad443 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
@@ -20,25 +20,15 @@ package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import com.google.auto.service.AutoService;
-
-@AutoService(SeaTunnelSink.class)
-public class HdfsFileSink extends BaseFileSink {
-
- @Override
- public String getPluginName() {
- return FileSystemType.HDFS.getFileSystemPluginName();
- }
+public abstract class BaseHdfsFileSink extends BaseFileSink {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
similarity index 86%
copy from seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 45298c8b0..329ac60f6 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -18,12 +18,10 @@
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
@@ -32,17 +30,9 @@ import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import com.google.auto.service.AutoService;
-
import java.io.IOException;
-@AutoService(SeaTunnelSource.class)
-public class HdfsFileSource extends BaseFileSource {
-
- @Override
- public String getPluginName() {
- return FileSystemType.HDFS.getFileSystemPluginName();
- }
+public abstract class BaseHdfsFileSource extends BaseFileSource {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
@@ -62,8 +52,8 @@ public class HdfsFileSource extends BaseFileSource {
if (pluginConfig.hasPath(HdfsSourceConfig.SCHEMA)) {
Config schemaConfig = pluginConfig.getConfig(HdfsSourceConfig.SCHEMA);
rowType = SeaTunnelSchema
- .buildWithConfig(schemaConfig)
- .getSeaTunnelRowType();
+ .buildWithConfig(schemaConfig)
+ .getSeaTunnelRowType();
readStrategy.setSeaTunnelRowTypeInfo(rowType);
} else {
try {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/config/HdfsSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/config/HdfsSourceConfig.java
similarity index 100%
rename from seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/config/HdfsSourceConfig.java
rename to seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/config/HdfsSourceConfig.java
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index f2e425e6e..bc6d7b301 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -28,7 +28,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>connector-file-base</artifactId>
-
+
<properties>
<commons-net.version>3.6</commons-net.version>
<orc.version>1.5.6</orc.version>
@@ -37,7 +37,7 @@
<flink.hadoop.version>2.7.5-7.0</flink.hadoop.version>
<parquet-avro.version>1.12.3</parquet-avro.version>
</properties>
-
+
<dependencyManagement>
<dependencies>
<dependency>
@@ -67,6 +67,12 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-core-base</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ </exclusions>
<scope>test</scope>
</dependency>
@@ -92,11 +98,16 @@
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>${orc.version}</version>
+ <classifier>nohive</classifier>
<exclusions>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
+ <exclusion>
+ <artifactId>hadoop-hdfs</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
</exclusions>
</dependency>
@@ -116,7 +127,6 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
</dependency>
-
</dependencies>
<build>
@@ -124,18 +134,9 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <!-- base module need skip shading -->
- <configuration>
- <skip>true</skip>
- </configuration>
- </execution>
- </executions>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
</plugin>
<!-- make sure that flatten runs after maven-shade-plugin -->
<plugin>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 875504e9b..0b9ed40aa 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -24,15 +24,15 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkCo
import lombok.NonNull;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import java.io.IOException;
import java.math.BigInteger;
@@ -91,12 +91,12 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
Path path = new Path(filePath);
try {
OrcFile.WriterOptions options = OrcFile.writerOptions(getConfiguration(hadoopConf))
- .setSchema(schema)
- // temporarily used snappy
- .compress(CompressionKind.SNAPPY)
- // use orc version 0.12
- .version(OrcFile.Version.V_0_12)
- .overwrite(true);
+ .setSchema(schema)
+ // temporarily used snappy
+ .compress(CompressionKind.SNAPPY)
+ // use orc version 0.12
+ .version(OrcFile.Version.V_0_12)
+ .overwrite(true);
Writer newWriter = OrcFile.createWriter(path, options);
this.beingWrittenWriter.put(filePath, newWriter);
return newWriter;
@@ -170,7 +170,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
if (value instanceof Boolean) {
Boolean bool = (Boolean) value;
longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? Long.valueOf(1) : Long.valueOf(0);
- } else if (value instanceof Integer) {
+ } else if (value instanceof Integer) {
longVector.vector[row] = ((Integer) value).longValue();
} else if (value instanceof Long) {
longVector.vector[row] = (Long) value;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 3ebffb5a1..5b946b769 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -36,22 +36,22 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.io.Text;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import java.io.IOException;
import java.math.BigDecimal;
@@ -381,15 +381,15 @@ public class OrcReadStrategy extends AbstractReadStrategy {
ColumnVector.Type keyType = mapVector.keys.type;
ColumnVector.Type valueType = mapVector.values.type;
return
- keyType == ColumnVector.Type.BYTES ||
- keyType == ColumnVector.Type.LONG ||
- keyType == ColumnVector.Type.DOUBLE
- &&
- valueType == ColumnVector.Type.LONG ||
- valueType == ColumnVector.Type.DOUBLE ||
- valueType == ColumnVector.Type.BYTES ||
- valueType == ColumnVector.Type.DECIMAL ||
- valueType == ColumnVector.Type.TIMESTAMP;
+ keyType == ColumnVector.Type.BYTES ||
+ keyType == ColumnVector.Type.LONG ||
+ keyType == ColumnVector.Type.DOUBLE
+ &&
+ valueType == ColumnVector.Type.LONG ||
+ valueType == ColumnVector.Type.DOUBLE ||
+ valueType == ColumnVector.Type.BYTES ||
+ valueType == ColumnVector.Type.DECIMAL ||
+ valueType == ColumnVector.Type.TIMESTAMP;
}
private Object[] readMapVector(ColumnVector mapVector, TypeDescription childType, int offset, int numValues) {
@@ -397,47 +397,47 @@ public class OrcReadStrategy extends AbstractReadStrategy {
switch (mapVector.type) {
case BYTES:
mapList =
- readBytesListVector(
- (BytesColumnVector) mapVector,
- childType,
- offset,
- numValues
- );
+ readBytesListVector(
+ (BytesColumnVector) mapVector,
+ childType,
+ offset,
+ numValues
+ );
break;
case LONG:
mapList =
- readLongListVector(
- (LongColumnVector) mapVector,
- childType,
- offset,
- numValues
- );
+ readLongListVector(
+ (LongColumnVector) mapVector,
+ childType,
+ offset,
+ numValues
+ );
break;
case DOUBLE:
mapList =
- readDoubleListVector(
- (DoubleColumnVector) mapVector,
- childType,
- offset,
- numValues
- );
+ readDoubleListVector(
+ (DoubleColumnVector) mapVector,
+ childType,
+ offset,
+ numValues
+ );
break;
case DECIMAL:
mapList =
- readDecimalListVector(
- (DecimalColumnVector) mapVector,
- offset,
- numValues
- );
+ readDecimalListVector(
+ (DecimalColumnVector) mapVector,
+ offset,
+ numValues
+ );
break;
case TIMESTAMP:
mapList =
- readTimestampListVector(
- (TimestampColumnVector) mapVector,
- childType,
- offset,
- numValues
- );
+ readTimestampListVector(
+ (TimestampColumnVector) mapVector,
+ childType,
+ offset,
+ numValues
+ );
break;
default:
throw new UnsupportedOperationException(mapVector.type.name() + " is not supported for MapColumnVectors");
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
index 94bd6603a..6241a815c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
@@ -29,10 +29,6 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>connector-file-ftp</artifactId>
- <properties>
- <connector.name>file.ftp</connector.name>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
index dc59fe9fe..db9d29cac 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
@@ -29,20 +29,12 @@
<artifactId>connector-file-hadoop</artifactId>
- <properties>
- <connector.name>file.hadoop</connector.name>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-file-base</artifactId>
+ <artifactId>connector-file-base-hadoop</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded-hadoop-2</artifactId>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
index a484a6345..2e619819f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
@@ -17,23 +17,16 @@
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
-
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
@AutoService(SeaTunnelSink.class)
-public class HdfsFileSink extends BaseFileSink {
+public class HdfsFileSink extends BaseHdfsFileSink {
@Override
public String getPluginName() {
@@ -42,11 +35,6 @@ public class HdfsFileSink extends BaseFileSink {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, FS_DEFAULT_NAME_KEY);
- if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
- }
super.prepare(pluginConfig);
- hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
index 45298c8b0..943a189e7 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
@@ -19,25 +19,14 @@ package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
-import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
-import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
-import java.io.IOException;
-
@AutoService(SeaTunnelSource.class)
-public class HdfsFileSource extends BaseFileSource {
+public class HdfsFileSource extends BaseHdfsFileSource {
@Override
public String getPluginName() {
@@ -46,31 +35,6 @@ public class HdfsFileSource extends BaseFileSource {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HdfsSourceConfig.FILE_PATH, HdfsSourceConfig.FILE_TYPE, HdfsSourceConfig.DEFAULT_FS);
- if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
- }
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE));
- String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH);
- hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS));
- try {
- filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
- } catch (IOException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
- }
- // support user-defined schema
- if (pluginConfig.hasPath(HdfsSourceConfig.SCHEMA)) {
- Config schemaConfig = pluginConfig.getConfig(HdfsSourceConfig.SCHEMA);
- rowType = SeaTunnelSchema
- .buildWithConfig(schemaConfig)
- .getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(rowType);
- } else {
- try {
- rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
- } catch (FilePluginException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
- }
- }
+ super.prepare(pluginConfig);
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
index 93f000fcb..8632a26cc 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
@@ -29,10 +29,6 @@
<artifactId>connector-file-local</artifactId>
- <properties>
- <connector.name>file.local</connector.name>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml
index 4e2ca4d83..0a0ae910f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml
@@ -30,7 +30,6 @@
<artifactId>connector-file-oss</artifactId>
<properties>
<hadoop-aliyun.version>2.9.2</hadoop-aliyun.version>
- <connector.name>file.oss</connector.name>
</properties>
<dependencies>
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml
index f71d739f9..f71bf8266 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/pom.xml
@@ -29,12 +29,17 @@
<artifactId>connector-file</artifactId>
<packaging>pom</packaging>
+ <properties>
+ <connector.name>connector.file</connector.name>
+ </properties>
+
<modules>
<module>connector-file-base</module>
<module>connector-file-hadoop</module>
<module>connector-file-local</module>
<module>connector-file-oss</module>
<module>connector-file-ftp</module>
+ <module>connector-file-base-hadoop</module>
</modules>
<build>
@@ -55,6 +60,10 @@
<!--suppress UnresolvedMavenProperty, this property is added by submodule-->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.avro</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.apache.orc</pattern>
+ <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.orc</shadedPattern>
+ </relocation>
<relocation>
<pattern>org.apache.parquet</pattern>
<!--suppress UnresolvedMavenProperty -->
@@ -70,7 +79,6 @@
</execution>
</executions>
</plugin>
-
<!-- make sure that flatten runs after maven-shade-plugin -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
diff --git a/seatunnel-connectors-v2/connector-hive/pom.xml b/seatunnel-connectors-v2/connector-hive/pom.xml
index 39c6f6429..e9f28c41a 100644
--- a/seatunnel-connectors-v2/connector-hive/pom.xml
+++ b/seatunnel-connectors-v2/connector-hive/pom.xml
@@ -31,12 +31,13 @@
<properties>
<hive.metastore.version>2.3.9</hive.metastore.version>
+ <connector.name>connector.hive</connector.name>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-file-hadoop</artifactId>
+ <artifactId>connector-file-base-hadoop</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
@@ -52,8 +53,9 @@
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
- <artifactId>hive-metastore</artifactId>
+ <artifactId>hive-exec</artifactId>
<version>${hive.metastore.version}</version>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
@@ -83,8 +85,55 @@
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
+ <exclusion>
+ <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+ <groupId>org.pentaho</groupId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
-
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.avro</pattern>
+ <!--suppress UnresolvedMavenProperty, this property is added by submodule-->
+ <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.avro</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.orc</pattern>
+ <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.orc</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.parquet</pattern>
+ <!--suppress UnresolvedMavenProperty -->
+ <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.parquet</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>shaded.parquet</pattern>
+ <!--suppress UnresolvedMavenProperty -->
+ <shadedPattern>${seatunnel.shade.package}.${connector.name}.shaded.parquet</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- make sure that flatten runs after maven-shade-plugin -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>flatten-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 9adae1158..c56beedcb 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -36,7 +36,7 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
@@ -57,7 +57,7 @@ import java.util.Map;
import java.util.stream.Collectors;
@AutoService(SeaTunnelSink.class)
-public class HiveSink extends BaseFileSink {
+public class HiveSink extends BaseHdfsFileSink {
private String dbName;
private String tableName;
private Table tableInformation;
@@ -104,8 +104,8 @@ public class HiveSink extends BaseFileSink {
if (TEXT_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
Map<String, String> parameters = tableInformation.getSd().getSerdeInfo().getParameters();
pluginConfig = pluginConfig.withValue(FILE_FORMAT, ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
- .withValue(FIELD_DELIMITER, ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
- .withValue(ROW_DELIMITER, ConfigValueFactory.fromAnyRef(parameters.get("line.delim")));
+ .withValue(FIELD_DELIMITER, ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
+ .withValue(ROW_DELIMITER, ConfigValueFactory.fromAnyRef(parameters.get("line.delim")));
} else if (PARQUET_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
pluginConfig = pluginConfig.withValue(FILE_FORMAT, ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
} else if (ORC_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
@@ -114,8 +114,8 @@ public class HiveSink extends BaseFileSink {
throw new RuntimeException("Only support [text parquet orc] file now");
}
pluginConfig = pluginConfig.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE, ConfigValueFactory.fromAnyRef(false))
- .withValue(FILE_NAME_EXPRESSION, ConfigValueFactory.fromAnyRef("${transactionId}"))
- .withValue(PATH, ConfigValueFactory.fromAnyRef(tableInformation.getSd().getLocation()));
+ .withValue(FILE_NAME_EXPRESSION, ConfigValueFactory.fromAnyRef("${transactionId}"))
+ .withValue(PATH, ConfigValueFactory.fromAnyRef(tableInformation.getSd().getLocation()));
if (!pluginConfig.hasPath(SAVE_MODE) || StringUtils.isBlank(pluginConfig.getString(SAVE_MODE))) {
pluginConfig = pluginConfig.withValue(SAVE_MODE, ConfigValueFactory.fromAnyRef(SaveMode.APPEND.toString()));
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index 92173892e..3789a139d 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -29,7 +29,7 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.HdfsFileSource;
+import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.BaseHdfsFileSource;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -43,7 +43,7 @@ import java.net.URI;
import java.net.URISyntaxException;
@AutoService(SeaTunnelSource.class)
-public class HiveSource extends HdfsFileSource {
+public class HiveSource extends BaseHdfsFileSource {
private Table tableInformation;
@Override
@@ -75,7 +75,7 @@ public class HiveSource extends HdfsFileSource {
String path = uri.getPath();
String defaultFs = hdfsLocation.replace(path, "");
pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_PATH, ConfigValueFactory.fromAnyRef(path))
- .withValue(FS_DEFAULT_NAME_KEY, ConfigValueFactory.fromAnyRef(defaultFs));
+ .withValue(FS_DEFAULT_NAME_KEY, ConfigValueFactory.fromAnyRef(defaultFs));
} catch (URISyntaxException e) {
throw new RuntimeException("Get hdfs cluster address failed, please check.", e);
}