You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/15 15:47:03 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add base source connector code for connector-file-base (#2399)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1829ddc66 [Feature][Connector-V2] Add base source connector code for connector-file-base (#2399)
1829ddc66 is described below
commit 1829ddc6627b1e8a6fd3ff422805ca3b287ecb67
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Mon Aug 15 23:46:58 2022 +0800
[Feature][Connector-V2] Add base source connector code for connector-file-base (#2399)
---
.../connector-file/connector-file-base/pom.xml | 9 +
.../{FileFormat.java => BaseSourceConfig.java} | 21 +-
.../seatunnel/file/config/FileFormat.java | 48 +-
.../config/{FileFormat.java => HadoopConf.java} | 20 +-
.../FilePluginException.java} | 22 +-
.../seatunnel/file/source/BaseFileSource.java | 65 +++
.../file/source/BaseFileSourceReader.java | 92 ++++
.../file/source/reader/AbstractReadStrategy.java | 86 ++++
.../file/source/reader/OrcReadStrategy.java | 542 +++++++++++++++++++++
.../file/source/reader/ParquetReadStrategy.java | 186 +++++++
.../seatunnel/file/source/reader/ReadStrategy.java | 42 ++
.../reader/ReadStrategyFactory.java} | 28 +-
.../file/source/reader/TextReadStrategy.java | 56 +++
.../split/FileSourceSplit.java} | 23 +-
.../source/split/FileSourceSplitEnumerator.java | 126 +++++
.../state/FileSourceState.java} | 24 +-
16 files changed, 1296 insertions(+), 94 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index 05e98573e..13173c14e 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -30,11 +30,13 @@
<artifactId>connector-file-base</artifactId>
<dependencies>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-json</artifactId>
@@ -73,5 +75,12 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
similarity index 71%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index f9ca34409..16f1bada9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -17,22 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
-import java.io.Serializable;
-
-public enum FileFormat implements Serializable {
- CSV("csv"),
- TEXT("txt"),
- PARQUET("parquet"),
- ORC("orc"),
- JSON("json");
-
- private String suffix;
-
- private FileFormat(String suffix) {
- this.suffix = suffix;
- }
-
- public String getSuffix() {
- return "." + suffix;
- }
+public class BaseSourceConfig {
+ public static final String FILE_TYPE = "type";
+ public static final String FILE_PATH = "path";
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index f9ca34409..790e7ca64 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -17,22 +17,56 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy;
+
import java.io.Serializable;
public enum FileFormat implements Serializable {
- CSV("csv"),
- TEXT("txt"),
- PARQUET("parquet"),
- ORC("orc"),
- JSON("json");
+ CSV("csv") {
+ @Override
+ public ReadStrategy getReadStrategy() {
+ return new TextReadStrategy();
+ }
+ },
+ TEXT("txt") {
+ @Override
+ public ReadStrategy getReadStrategy() {
+ return new TextReadStrategy();
+ }
+ },
+ PARQUET("parquet") {
+ @Override
+ public ReadStrategy getReadStrategy() {
+ return new ParquetReadStrategy();
+ }
+ },
+ ORC("orc") {
+ @Override
+ public ReadStrategy getReadStrategy() {
+ return new OrcReadStrategy();
+ }
+ },
+ JSON("json") {
+ @Override
+ public ReadStrategy getReadStrategy() {
+ return new TextReadStrategy();
+ }
+ };
- private String suffix;
+ private final String suffix;
- private FileFormat(String suffix) {
+ FileFormat(String suffix) {
this.suffix = suffix;
}
public String getSuffix() {
return "." + suffix;
}
+
+ public ReadStrategy getReadStrategy() {
+ return null;
+ }
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
similarity index 74%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
index f9ca34409..f3bbedf08 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
@@ -17,22 +17,18 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
+import lombok.Data;
+
import java.io.Serializable;
-public enum FileFormat implements Serializable {
- CSV("csv"),
- TEXT("txt"),
- PARQUET("parquet"),
- ORC("orc"),
- JSON("json");
+@Data
+public class HadoopConf implements Serializable {
- private String suffix;
+ private String hdfsNameKey;
- private FileFormat(String suffix) {
- this.suffix = suffix;
- }
+ private String fsHdfsImpl = "org.apache.hadoop.hdfs.DistributedFileSystem";
- public String getSuffix() {
- return "." + suffix;
+ public HadoopConf(String hdfsNameKey) {
+ this.hdfsNameKey = hdfsNameKey;
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
similarity index 67%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
index f9ca34409..e747c6aea 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
@@ -15,24 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.exception;
-import java.io.Serializable;
-
-public enum FileFormat implements Serializable {
- CSV("csv"),
- TEXT("txt"),
- PARQUET("parquet"),
- ORC("orc"),
- JSON("json");
-
- private String suffix;
-
- private FileFormat(String suffix) {
- this.suffix = suffix;
+public class FilePluginException extends Exception {
+ public FilePluginException(String message) {
+ super(message);
}
- public String getSuffix() {
- return "." + suffix;
+ public FilePluginException(String message, Throwable cause) {
+ super(message, cause);
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
new file mode 100644
index 000000000..02d1e024a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
+
+import java.util.List;
+
+public abstract class BaseFileSource implements SeaTunnelSource<SeaTunnelRow, FileSourceSplit, FileSourceState> {
+ protected SeaTunnelRowType rowType;
+ protected ReadStrategy readStrategy;
+ protected HadoopConf hadoopConf;
+ protected List<String> filePaths;
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return rowType;
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, FileSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
+ return new BaseFileSourceReader(readStrategy, hadoopConf, readerContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<FileSourceSplit, FileSourceState> createEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext) throws Exception {
+ return new FileSourceSplitEnumerator(enumeratorContext, filePaths);
+ }
+
+ @Override
+ public SourceSplitEnumerator<FileSourceSplit, FileSourceState> restoreEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext, FileSourceState checkpointState) throws Exception {
+ return new FileSourceSplitEnumerator(enumeratorContext, filePaths, checkpointState);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
new file mode 100644
index 000000000..fa13ec66c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class BaseFileSourceReader implements SourceReader<SeaTunnelRow, FileSourceSplit> {
+ private static final long THREAD_WAIT_TIME = 500L;
+ private final ReadStrategy readStrategy;
+ private final HadoopConf hadoopConf;
+ private final SourceReader.Context context;
+ private final Set<FileSourceSplit> sourceSplits;
+
+ public BaseFileSourceReader(ReadStrategy readStrategy, HadoopConf hadoopConf, SourceReader.Context context) {
+ this.readStrategy = readStrategy;
+ this.hadoopConf = hadoopConf;
+ this.context = context;
+ this.sourceSplits = new HashSet<>();
+ }
+
+ @Override
+ public void open() throws Exception {
+ readStrategy.init(hadoopConf);
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ if (sourceSplits.isEmpty()) {
+ Thread.sleep(THREAD_WAIT_TIME);
+ return;
+ }
+ sourceSplits.forEach(source -> {
+ try {
+ readStrategy.read(source.splitId(), output);
+ } catch (Exception e) {
+ throw new RuntimeException("File source read error", e);
+ }
+ });
+ context.signalNoMoreElement();
+ }
+
+ @Override
+ public List<FileSourceSplit> snapshotState(long checkpointId) throws Exception {
+ return new ArrayList<>(sourceSplits);
+ }
+
+ @Override
+ public void addSplits(List<FileSourceSplit> splits) {
+ sourceSplits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
new file mode 100644
index 000000000..eba6d4b21
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public abstract class AbstractReadStrategy implements ReadStrategy {
+ HadoopConf hadoopConf;
+
+ @Override
+ public void init(HadoopConf conf) {
+ this.hadoopConf = conf;
+ }
+
+ @Override
+ public Configuration getConfiguration(HadoopConf hadoopConf) {
+ Configuration configuration = new Configuration();
+ if (hadoopConf != null) {
+ configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
+ configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
+ }
+ return configuration;
+ }
+
+ Configuration getConfiguration() throws FilePluginException {
+ if (null == hadoopConf) {
+ log.info("Local file reader didn't need hadoopConf");
+ }
+ return getConfiguration(hadoopConf);
+ }
+
+ boolean checkFileType(String path) {
+ return true;
+ }
+
+ @Override
+ public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException {
+ Configuration configuration = getConfiguration(hadoopConf);
+ List<String> fileNames = new ArrayList<>();
+ FileSystem hdfs = FileSystem.get(configuration);
+ Path listFiles = new Path(path);
+ FileStatus[] stats = hdfs.listStatus(listFiles);
+ for (FileStatus fileStatus : stats) {
+ if (fileStatus.isDirectory()) {
+ fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString()));
+ continue;
+ }
+ if (fileStatus.isFile()) {
+ // filter '_SUCCESS' file
+ if (!fileStatus.getPath().getName().equals("_SUCCESS")) {
+ fileNames.add(fileStatus.getPath().toString());
+ }
+ }
+ }
+ return fileNames;
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
new file mode 100644
index 000000000..209965b63
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class OrcReadStrategy extends AbstractReadStrategy {
+
+ private SeaTunnelRowType seaTunnelRowTypeInfo;
+ private static final long MIN_SIZE = 16 * 1024;
+
+ @Override
+ public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
+ if (Boolean.FALSE.equals(checkFileType(path))) {
+ throw new Exception("Please check file type");
+ }
+ Configuration configuration = getConfiguration();
+ Path filePath = new Path(path);
+ OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(configuration);
+ try (Reader reader = OrcFile.createReader(filePath, readerOptions)) {
+ TypeDescription schema = reader.getSchema();
+ List<TypeDescription> children = schema.getChildren();
+ RecordReader rows = reader.rows();
+ VectorizedRowBatch rowBatch = reader.getSchema().createRowBatch();
+ while (rows.nextBatch(rowBatch)) {
+ int num = 0;
+ for (int i = 0; i < rowBatch.size; i++) {
+ int numCols = rowBatch.numCols;
+ Object[] fields = new Object[numCols];
+ ColumnVector[] cols = rowBatch.cols;
+ for (int j = 0; j < numCols; j++) {
+ if (cols[j] == null) {
+ fields[j] = null;
+ } else {
+ fields[j] = readColumn(cols[j], children.get(j), num);
+ }
+ }
+ output.collect(new SeaTunnelRow(fields));
+ num++;
+ }
+ }
+ }
+ }
+
+ @Override
+ public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
+ if (null != seaTunnelRowTypeInfo) {
+ return seaTunnelRowTypeInfo;
+ }
+ Configuration configuration = getConfiguration(hadoopConf);
+ OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(configuration);
+ Path dstDir = new Path(path);
+ try (Reader reader = OrcFile.createReader(dstDir, readerOptions)) {
+ TypeDescription schema = reader.getSchema();
+ String[] fields = new String[schema.getFieldNames().size()];
+ SeaTunnelDataType<?>[] types = new SeaTunnelDataType[schema.getFieldNames().size()];
+ for (int i = 0; i < schema.getFieldNames().size(); i++) {
+ fields[i] = schema.getFieldNames().get(i);
+ types[i] = BasicType.STRING_TYPE;
+ }
+ seaTunnelRowTypeInfo = new SeaTunnelRowType(fields, types);
+ return seaTunnelRowTypeInfo;
+ } catch (IOException e) {
+ throw new FilePluginException("Create OrcReader Fail", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ boolean checkFileType(String path) {
+ try {
+ boolean checkResult;
+ Configuration configuration = getConfiguration();
+ FileSystem fileSystem = FileSystem.get(configuration);
+ Path filePath = new Path(path);
+ FSDataInputStream in = fileSystem.open(filePath);
+ // try to get Postscript in orc file
+ long size = fileSystem.getFileStatus(filePath).getLen();
+ int readSize = (int) Math.min(size, MIN_SIZE);
+ in.seek(size - readSize);
+ ByteBuffer buffer = ByteBuffer.allocate(readSize);
+ in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+ int psLen = buffer.get(readSize - 1) & 0xff;
+ int len = OrcFile.MAGIC.length();
+ if (psLen < len + 1) {
+ in.close();
+ return false;
+ }
+ int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1 - len;
+ byte[] array = buffer.array();
+ if (Text.decode(array, offset, len).equals(OrcFile.MAGIC)) {
+ checkResult = true;
+ } else {
+ // If it isn't there, this may be the 0.11.0 version of ORC.
+ // Read the first 3 bytes of the file to check for the header
+ in.seek(0);
+ byte[] header = new byte[len];
+ in.readFully(header, 0, len);
+ // if it isn't there, this isn't an ORC file
+ checkResult = Text.decode(header, 0, len).equals(OrcFile.MAGIC);
+ }
+ in.close();
+ return checkResult;
+ } catch (FilePluginException | IOException e) {
+ String errorMsg = String.format("Check orc file [%s] error", path);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ private Object readColumn(ColumnVector colVec, TypeDescription colType, int rowNum) {
+ Object columnObj = null;
+ if (!colVec.isNull[rowNum]) {
+ switch (colVec.type) {
+ case LONG:
+ columnObj = readLongVal(colVec, colType, rowNum);
+ break;
+ case DOUBLE:
+ columnObj = ((DoubleColumnVector) colVec).vector[rowNum];
+ break;
+ case BYTES:
+ columnObj = readBytesVal(colVec, rowNum);
+ break;
+ case DECIMAL:
+ columnObj = readDecimalVal(colVec, rowNum);
+ break;
+ case TIMESTAMP:
+ columnObj = readTimestampVal(colVec, colType, rowNum);
+ break;
+ case STRUCT:
+ columnObj = readStructVal(colVec, colType, rowNum);
+ break;
+ case LIST:
+ columnObj = readListVal(colVec, colType, rowNum);
+ break;
+ case MAP:
+ columnObj = readMapVal(colVec, colType, rowNum);
+ break;
+ case UNION:
+ columnObj = readUnionVal(colVec, colType, rowNum);
+ break;
+ default:
+ throw new RuntimeException(
+ "ReadColumn: unsupported ORC file column type: " + colVec.type.name()
+ );
+ }
+ }
+ return columnObj;
+ }
+
+ private Object readLongVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
+ Object colObj = null;
+ if (!colVec.isNull[rowNum]) {
+ LongColumnVector longVec = (LongColumnVector) colVec;
+ long longVal = longVec.vector[rowNum];
+ colObj = longVal;
+ if (colType.getCategory() == TypeDescription.Category.INT) {
+ colObj = (int) longVal;
+ } else if (colType.getCategory() == TypeDescription.Category.BOOLEAN) {
+ colObj = longVal == 1 ? Boolean.TRUE : Boolean.FALSE;
+ } else if (colType.getCategory() == TypeDescription.Category.DATE) {
+ colObj = new Date(longVal);
+ }
+ }
+ return colObj;
+ }
+
+ private Object readBytesVal(ColumnVector colVec, int rowNum) {
+ Object bytesObj = null;
+ if (!colVec.isNull[rowNum]) {
+ BytesColumnVector bytesVector = (BytesColumnVector) colVec;
+ bytesObj = bytesVector.toString(rowNum);
+ }
+ return bytesObj;
+ }
+
+ private Object readDecimalVal(ColumnVector colVec, int rowNum) {
+ Object decimalObj = null;
+ if (!colVec.isNull[rowNum]) {
+ DecimalColumnVector decimalVec = (DecimalColumnVector) colVec;
+ decimalObj = decimalVec.vector[rowNum].getHiveDecimal().bigDecimalValue();
+ }
+ return decimalObj;
+ }
+
+ private Object readTimestampVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
+ Object timestampVal = null;
+ if (!colVec.isNull[rowNum]) {
+ TimestampColumnVector timestampVec = (TimestampColumnVector) colVec;
+ int nanos = timestampVec.nanos[rowNum];
+ long millis = timestampVec.time[rowNum];
+ Timestamp timestamp = new Timestamp(millis);
+ timestamp.setNanos(nanos);
+ timestampVal = timestamp;
+ if (colType.getCategory() == TypeDescription.Category.DATE) {
+ timestampVal = new Date(timestamp.getTime());
+ }
+ }
+ return timestampVal;
+ }
+
+ private Object readStructVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
+ Object structObj = null;
+ if (!colVec.isNull[rowNum]) {
+ List<Object> fieldValList = new ArrayList<>();
+ StructColumnVector structVector = (StructColumnVector) colVec;
+ ColumnVector[] fieldVec = structVector.fields;
+ List<TypeDescription> fieldTypes = colType.getChildren();
+ for (int i = 0; i < fieldVec.length; i++) {
+ Object fieldObj = readColumn(fieldVec[i], fieldTypes.get(i), rowNum);
+ fieldValList.add(fieldObj);
+ }
+ structObj = fieldValList;
+ }
+ return structObj;
+ }
+
+ private Object readMapVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
+ Map<Object, Object> objMap = new HashMap<>();
+ MapColumnVector mapVector = (MapColumnVector) colVec;
+ if (checkMapColumnVectorTypes(mapVector)) {
+ int mapSize = (int) mapVector.lengths[rowNum];
+ int offset = (int) mapVector.offsets[rowNum];
+ List<TypeDescription> mapTypes = colType.getChildren();
+ TypeDescription keyType = mapTypes.get(0);
+ TypeDescription valueType = mapTypes.get(1);
+ ColumnVector keyChild = mapVector.keys;
+ ColumnVector valueChild = mapVector.values;
+ List<Object> keyList = readMapVector(keyChild, keyType, offset, mapSize);
+ List<Object> valueList = readMapVector(valueChild, valueType, offset, mapSize);
+ for (int i = 0; i < keyList.size(); i++) {
+ objMap.put(keyList.get(i), valueList.get(i));
+ }
+ } else {
+ throw new RuntimeException("readMapVal: unsupported key or value types");
+ }
+ return objMap;
+ }
+
+ private boolean checkMapColumnVectorTypes(MapColumnVector mapVector) {
+ ColumnVector.Type keyType = mapVector.keys.type;
+ ColumnVector.Type valueType = mapVector.values.type;
+ return
+ keyType == ColumnVector.Type.BYTES ||
+ keyType == ColumnVector.Type.LONG ||
+ keyType == ColumnVector.Type.DOUBLE
+ &&
+ valueType == ColumnVector.Type.LONG ||
+ valueType == ColumnVector.Type.DOUBLE ||
+ valueType == ColumnVector.Type.BYTES ||
+ valueType == ColumnVector.Type.DECIMAL ||
+ valueType == ColumnVector.Type.TIMESTAMP;
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<Object> readMapVector(ColumnVector mapVector, TypeDescription childType, int offset, int numValues) {
+ List<Object> mapList;
+ switch (mapVector.type) {
+ case BYTES:
+ mapList =
+ (List<Object>) readBytesListVector(
+ (BytesColumnVector) mapVector,
+ childType,
+ offset,
+ numValues
+ );
+ break;
+ case LONG:
+ mapList =
+ readLongListVector(
+ (LongColumnVector) mapVector,
+ childType,
+ offset,
+ numValues
+ );
+ break;
+ case DOUBLE:
+ mapList =
+ (List<Object>) readDoubleListVector(
+ (DoubleColumnVector) mapVector,
+ offset,
+ numValues
+ );
+ break;
+ case DECIMAL:
+ mapList =
+ (List<Object>) readDecimalListVector(
+ (DecimalColumnVector) mapVector,
+ offset,
+ numValues
+ );
+ break;
+ case TIMESTAMP:
+ mapList =
+ (List<Object>) readTimestampListVector(
+ (TimestampColumnVector) mapVector,
+ childType,
+ offset,
+ numValues
+ );
+ break;
+ default:
+ throw new RuntimeException(mapVector.type.name() + " is not supported for MapColumnVectors");
+ }
+ return mapList;
+ }
+
+ private Object readUnionVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
+ Pair<TypeDescription, Object> columnValuePair;
+ UnionColumnVector unionVector = (UnionColumnVector) colVec;
+ int tagVal = unionVector.tags[rowNum];
+ List<TypeDescription> unionFieldTypes = colType.getChildren();
+ if (tagVal < unionFieldTypes.size()) {
+ TypeDescription fieldType = unionFieldTypes.get(tagVal);
+ if (tagVal < unionVector.fields.length) {
+ ColumnVector fieldVector = unionVector.fields[tagVal];
+ Object unionValue = readColumn(fieldVector, fieldType, rowNum);
+ columnValuePair = Pair.of(fieldType, unionValue);
+ } else {
+ throw new RuntimeException(
+ "readUnionVal: union tag value out of range for union column vectors"
+ );
+ }
+ } else {
+ throw new RuntimeException(
+ "readUnionVal: union tag value out of range for union types"
+ );
+ }
+ return columnValuePair;
+ }
+
+ private Object readListVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
+ Object listValues = null;
+ if (!colVec.isNull[rowNum]) {
+ ListColumnVector listVector = (ListColumnVector) colVec;
+ ColumnVector listChildVector = listVector.child;
+ TypeDescription childType = colType.getChildren().get(0);
+ switch (listChildVector.type) {
+ case LONG:
+ listValues = readLongListValues(listVector, childType, rowNum);
+ break;
+ case DOUBLE:
+ listValues = readDoubleListValues(listVector, rowNum);
+ break;
+ case BYTES:
+ listValues = readBytesListValues(listVector, childType, rowNum);
+ break;
+ case DECIMAL:
+ listValues = readDecimalListValues(listVector, rowNum);
+ break;
+ case TIMESTAMP:
+ listValues = readTimestampListValues(listVector, childType, rowNum);
+ break;
+ default:
+ throw new RuntimeException(
+ listVector.type.name() + " is not supported for ListColumnVectors"
+ );
+ }
+ }
+ return listValues;
+ }
+
+ private Object readLongListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
+ int offset = (int) listVector.offsets[rowNum];
+ int numValues = (int) listVector.lengths[rowNum];
+ LongColumnVector longVector = (LongColumnVector) listVector.child;
+ return readLongListVector(longVector, childType, offset, numValues);
+ }
+
+ private List<Object> readLongListVector(LongColumnVector longVector, TypeDescription childType, int offset, int numValues) {
+ List<Object> longList = new ArrayList<>();
+ for (int i = 0; i < numValues; i++) {
+ if (!longVector.isNull[offset + i]) {
+ long longVal = longVector.vector[offset + i];
+ if (childType.getCategory() == TypeDescription.Category.BOOLEAN) {
+ Boolean boolVal = longVal == 0 ? Boolean.valueOf(false) : Boolean.valueOf(true);
+ longList.add(boolVal);
+ } else if (childType.getCategory() == TypeDescription.Category.INT) {
+ Integer intObj = (int) longVal;
+ longList.add(intObj);
+ } else {
+ longList.add(longVal);
+ }
+ } else {
+ longList.add(null);
+ }
+ }
+ return longList;
+ }
+
+ private Object readDoubleListValues(ListColumnVector listVector, int rowNum) {
+ int offset = (int) listVector.offsets[rowNum];
+ int numValues = (int) listVector.lengths[rowNum];
+ DoubleColumnVector doubleVec = (DoubleColumnVector) listVector.child;
+ return readDoubleListVector(doubleVec, offset, numValues);
+ }
+
+ private Object readDoubleListVector(DoubleColumnVector doubleVec, int offset, int numValues) {
+ List<Object> doubleList = new ArrayList<>();
+ for (int i = 0; i < numValues; i++) {
+ if (!doubleVec.isNull[offset + i]) {
+ Double doubleVal = doubleVec.vector[offset + i];
+ doubleList.add(doubleVal);
+ } else {
+ doubleList.add(null);
+ }
+ }
+ return doubleList;
+ }
+
+ private Object readBytesListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
+ int offset = (int) listVector.offsets[rowNum];
+ int numValues = (int) listVector.lengths[rowNum];
+ BytesColumnVector bytesVec = (BytesColumnVector) listVector.child;
+ return readBytesListVector(bytesVec, childType, offset, numValues);
+ }
+
+ private Object readBytesListVector(BytesColumnVector bytesVec, TypeDescription childType, int offset, int numValues) {
+ List<Object> bytesValList = new ArrayList<>();
+ for (int i = 0; i < numValues; i++) {
+ if (!bytesVec.isNull[offset + i]) {
+ byte[] byteArray = bytesVec.vector[offset + i];
+ int vecLen = bytesVec.length[offset + i];
+ int vecStart = bytesVec.start[offset + i];
+ byte[] vecCopy = Arrays.copyOfRange(byteArray, vecStart, vecStart + vecLen);
+ if (childType.getCategory() == TypeDescription.Category.STRING) {
+ String str = new String(vecCopy);
+ bytesValList.add(str);
+ } else {
+ bytesValList.add(vecCopy);
+ }
+ } else {
+ bytesValList.add(null);
+ }
+ }
+ return bytesValList;
+ }
+
+ private Object readDecimalListValues(ListColumnVector listVector, int rowNum) {
+ int offset = (int) listVector.offsets[rowNum];
+ int numValues = (int) listVector.lengths[rowNum];
+ DecimalColumnVector decimalVec = (DecimalColumnVector) listVector.child;
+ return readDecimalListVector(decimalVec, offset, numValues);
+ }
+
+ private Object readDecimalListVector(DecimalColumnVector decimalVector, int offset, int numValues) {
+ List<Object> decimalList = new ArrayList<>();
+ for (int i = 0; i < numValues; i++) {
+ if (!decimalVector.isNull[offset + i]) {
+ BigDecimal bigDecimal = decimalVector.vector[i].getHiveDecimal().bigDecimalValue();
+ decimalList.add(bigDecimal);
+ } else {
+ decimalList.add(null);
+ }
+ }
+ return decimalList;
+ }
+
+ private Object readTimestampListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
+ int offset = (int) listVector.offsets[rowNum];
+ int numValues = (int) listVector.lengths[rowNum];
+ TimestampColumnVector timestampVec = (TimestampColumnVector) listVector.child;
+ return readTimestampListVector(timestampVec, childType, offset, numValues);
+ }
+
+ private Object readTimestampListVector(TimestampColumnVector timestampVector, TypeDescription childType, int offset, int numValues) {
+ List<Object> timestampList = new ArrayList<>();
+ for (int i = 0; i < numValues; i++) {
+ if (!timestampVector.isNull[offset + i]) {
+ int nanos = timestampVector.nanos[offset + i];
+ long millis = timestampVector.time[offset + i];
+ Timestamp timestamp = new Timestamp(millis);
+ timestamp.setNanos(nanos);
+ if (childType.getCategory() == TypeDescription.Category.DATE) {
+ Date date = new Date(timestamp.getTime());
+ timestampList.add(date);
+ } else {
+ timestampList.add(timestamp);
+ }
+ } else {
+ timestampList.add(null);
+ }
+ }
+ return timestampList;
+ }
+}
+
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
new file mode 100644
index 000000000..82e05beba
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class ParquetReadStrategy extends AbstractReadStrategy {
+
+ private SeaTunnelRowType seaTunnelRowType;
+
+ private static final byte[] PARQUET_MAGIC = new byte[]{(byte) 'P', (byte) 'A', (byte) 'R', (byte) '1'};
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
+ if (Boolean.FALSE.equals(checkFileType(path))) {
+ throw new Exception("please check file type");
+ }
+ Path filePath = new Path(path);
+ HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, getConfiguration());
+ int fieldsCount = seaTunnelRowType.getTotalFields();
+ GenericRecord record;
+ try (ParquetReader<GenericData.Record> reader = AvroParquetReader.<GenericData.Record>builder(hadoopInputFile).build()) {
+ while ((record = reader.read()) != null) {
+ Object[] fields = new Object[fieldsCount];
+ for (int i = 0; i < fieldsCount; i++) {
+ Object data = record.get(i);
+ try {
+ if (data instanceof GenericData.Fixed) {
+ // judge the data in upstream is or not decimal type
+ data = fixed2String((GenericData.Fixed) data);
+ } else if (data instanceof ArrayList) {
+ // judge the data in upstream is or not array type
+ data = array2String((ArrayList<GenericData.Record>) data);
+ }
+ } catch (Exception e) {
+ data = record.get(i);
+ } finally {
+ fields[i] = data.toString();
+ }
+ }
+ output.collect(new SeaTunnelRow(fields));
+ }
+ }
+ }
+
+ @Override
+ public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
+ if (seaTunnelRowType != null) {
+ return seaTunnelRowType;
+ }
+ Configuration configuration = getConfiguration(hadoopConf);
+ Path filePath = new Path(path);
+ ParquetMetadata metadata;
+ try {
+ metadata = ParquetFileReader.readFooter(configuration, filePath);
+ } catch (IOException e) {
+ throw new FilePluginException("Create parquet reader failed", e);
+ }
+ FileMetaData fileMetaData = metadata.getFileMetaData();
+ MessageType schema = fileMetaData.getSchema();
+ int fieldCount = schema.getFieldCount();
+ String[] fields = new String[fieldCount];
+ SeaTunnelDataType<?>[] types = new SeaTunnelDataType[fieldCount];
+ for (int i = 0; i < fieldCount; i++) {
+ fields[i] = schema.getFieldName(i);
+ // Temporarily each field is treated as a string type
+ // I think we can use the schema information to build seatunnel column type
+ types[i] = BasicType.STRING_TYPE;
+ }
+ seaTunnelRowType = new SeaTunnelRowType(fields, types);
+ return seaTunnelRowType;
+ }
+
+ private String fixed2String(GenericData.Fixed fixed) {
+ Schema schema = fixed.getSchema();
+ byte[] bytes = fixed.bytes();
+ int precision = Integer.parseInt(schema.getObjectProps().get("precision").toString());
+ int scale = Integer.parseInt(schema.getObjectProps().get("scale").toString());
+ BigDecimal bigDecimal = bytes2Decimal(bytes, precision, scale);
+ return bigDecimal.toString();
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private BigDecimal bytes2Decimal(byte[] bytesArray, int precision, int scale) {
+ Binary value = Binary.fromConstantByteArray(bytesArray);
+ if (precision <= 18) {
+ ByteBuffer buffer = value.toByteBuffer();
+ byte[] bytes = buffer.array();
+ int start = buffer.arrayOffset() + buffer.position();
+ int end = buffer.arrayOffset() + buffer.limit();
+ long unscaled = 0L;
+ int i = start;
+ while (i < end) {
+ unscaled = unscaled << 8 | bytes[i] & 0xff;
+ i++;
+ }
+ int bits = 8 * (end - start);
+ long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
+ if (unscaledNew <= -Math.pow(10, 18) || unscaledNew >= Math.pow(10, 18)) {
+ return new BigDecimal(unscaledNew);
+ } else {
+ return BigDecimal.valueOf(unscaledNew / Math.pow(10, scale));
+ }
+ } else {
+ return new BigDecimal(new BigInteger(value.getBytes()), scale);
+ }
+ }
+
+ @Override
+ boolean checkFileType(String path) {
+ boolean checkResult;
+ byte[] magic = new byte[PARQUET_MAGIC.length];
+ try {
+ Configuration configuration = getConfiguration();
+ FileSystem fileSystem = FileSystem.get(configuration);
+ Path filePath = new Path(path);
+ FSDataInputStream in = fileSystem.open(filePath);
+ // try to get header information in a parquet file
+ in.seek(0);
+ in.readFully(magic);
+ checkResult = Arrays.equals(magic, PARQUET_MAGIC);
+ in.close();
+ return checkResult;
+ } catch (FilePluginException | IOException e) {
+ String errorMsg = String.format("Check parquet file [%s] error", path);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ private String array2String(ArrayList<GenericData.Record> data) throws JsonProcessingException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ List<String> values = data.stream().map(record -> record.get(0).toString()).collect(Collectors.toList());
+ return objectMapper.writeValueAsString(values);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
new file mode 100644
index 000000000..1114eab65
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+public interface ReadStrategy extends Serializable {
+ void init(HadoopConf conf);
+
+ Configuration getConfiguration(HadoopConf conf);
+
+ void read(String path, Collector<SeaTunnelRow> output) throws Exception;
+
+ SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException;
+
+ List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException;
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
similarity index 53%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
index f9ca34409..f12f2c2dc 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
@@ -15,24 +15,24 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
-import java.io.Serializable;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-public enum FileFormat implements Serializable {
- CSV("csv"),
- TEXT("txt"),
- PARQUET("parquet"),
- ORC("orc"),
- JSON("json");
+import lombok.extern.slf4j.Slf4j;
- private String suffix;
+@Slf4j
+public class ReadStrategyFactory {
- private FileFormat(String suffix) {
- this.suffix = suffix;
- }
+ private ReadStrategyFactory() {}
- public String getSuffix() {
- return "." + suffix;
+ public static ReadStrategy of(String fileType) {
+ try {
+ FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase());
+ return fileFormat.getReadStrategy();
+ } catch (IllegalArgumentException e) {
+ String errorMsg = String.format("File source connector not support this file type [%s], please check your config", fileType);
+ throw new RuntimeException(errorMsg);
+ }
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
new file mode 100644
index 000000000..61e6ede77
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+public class TextReadStrategy extends AbstractReadStrategy {
+
+ private static final String TEXT_FIELD_NAME = "lines";
+
+ @Override
+ public void read(String path, Collector<SeaTunnelRow> output) throws IOException, FilePluginException {
+ Configuration conf = getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ Path filePath = new Path(path);
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
+ reader.lines().forEach(line -> output.collect(new SeaTunnelRow(new String[]{line})));
+ }
+ }
+
+ @Override
+ public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) {
+ return new SeaTunnelRowType(new String[]{TEXT_FIELD_NAME},
+ new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
similarity index 67%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
index f9ca34409..50161e717 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
@@ -15,24 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
-import java.io.Serializable;
+import org.apache.seatunnel.api.source.SourceSplit;
-public enum FileFormat implements Serializable {
- CSV("csv"),
- TEXT("txt"),
- PARQUET("parquet"),
- ORC("orc"),
- JSON("json");
+public class FileSourceSplit implements SourceSplit {
+ private final String splitId;
- private String suffix;
-
- private FileFormat(String suffix) {
- this.suffix = suffix;
+ public FileSourceSplit(String splitId) {
+ this.splitId = splitId;
}
- public String getSuffix() {
- return "." + suffix;
+ @Override
+ public String splitId() {
+ return this.splitId;
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
new file mode 100644
index 000000000..fa3cebd28
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class FileSourceSplitEnumerator implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
+ private final Context<FileSourceSplit> context;
+ private Set<FileSourceSplit> pendingSplit;
+ private Set<FileSourceSplit> assignedSplit;
+ private final List<String> filePaths;
+
+ public FileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> context, List<String> filePaths) {
+ this.context = context;
+ this.filePaths = filePaths;
+ }
+
+ public FileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> context, List<String> filePaths,
+ FileSourceState sourceState) {
+ this(context, filePaths);
+ this.assignedSplit = sourceState.getAssignedSplit();
+ }
+
+ @Override
+ public void open() {
+ this.assignedSplit = new HashSet<>();
+ this.pendingSplit = new HashSet<>();
+ }
+
+ @Override
+ public void run() {
+ pendingSplit = getHiveFileSplit();
+ assignSplit(context.registeredReaders());
+ }
+
+ private Set<FileSourceSplit> getHiveFileSplit() {
+ Set<FileSourceSplit> hiveSourceSplits = new HashSet<>();
+ filePaths.forEach(k -> hiveSourceSplits.add(new FileSourceSplit(k)));
+ return hiveSourceSplits;
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
+ if (!splits.isEmpty()) {
+ pendingSplit.addAll(splits);
+ assignSplit(Collections.singletonList(subtaskId));
+ }
+ }
+
+ private void assignSplit(Collection<Integer> taskIDList) {
+ Map<Integer, List<FileSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE);
+ for (int taskID : taskIDList) {
+ readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
+ }
+
+ pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(), taskIDList.size()))
+ .add(s));
+ readySplit.forEach(context::assignSplit);
+ assignedSplit.addAll(pendingSplit);
+ pendingSplit.clear();
+ }
+
+ private static int getSplitOwner(String tp, int numReaders) {
+ return tp.hashCode() % numReaders;
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplit.size();
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ if (!pendingSplit.isEmpty()) {
+ assignSplit(Collections.singletonList(subtaskId));
+ }
+ }
+
+ @Override
+ public FileSourceState snapshotState(long checkpointId) {
+ return new FileSourceState(assignedSplit);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/state/FileSourceState.java
similarity index 60%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
copy to seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/state/FileSourceState.java
index f9ca34409..582618dac 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/state/FileSourceState.java
@@ -15,24 +15,22 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.source.state;
-import java.io.Serializable;
+import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
-public enum FileFormat implements Serializable {
- CSV("csv"),
- TEXT("txt"),
- PARQUET("parquet"),
- ORC("orc"),
- JSON("json");
+import java.io.Serializable;
+import java.util.Set;
- private String suffix;
+public class FileSourceState implements Serializable {
+ private static final long serialVersionUID = 9208369906513934611L;
+ private final Set<FileSourceSplit> assignedSplit;
- private FileFormat(String suffix) {
- this.suffix = suffix;
+ public FileSourceState(Set<FileSourceSplit> assignedSplit) {
+ this.assignedSplit = assignedSplit;
}
- public String getSuffix() {
- return "." + suffix;
+ public Set<FileSourceSplit> getAssignedSplit() {
+ return assignedSplit;
}
}