You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/07/27 02:00:46 UTC
[incubator-seatunnel] branch dev updated: [Connector-V2] Add parquet file reader for Hive Source Connector (#2199) (#2237)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 59db97ed3 [Connector-V2] Add parquet file reader for Hive Source Connector (#2199) (#2237)
59db97ed3 is described below
commit 59db97ed3460819145f32737342cb31dea6188db
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Wed Jul 27 10:00:40 2022 +0800
[Connector-V2] Add parquet file reader for Hive Source Connector (#2199) (#2237)
* [Connector-V2][connector-hive] Add parquet read strategy in hive connector source
* [Connector-V2][connector-hive] Optimize the process of generating file read strategies
---
seatunnel-connectors-v2/connector-hive/pom.xml | 6 +
.../seatunnel/hive/source/HiveSource.java | 11 +-
.../file/reader/format/ParquetReadStrategy.java | 156 +++++++++++++++++++++
.../file/reader/format/ReadStrategyFactory.java | 38 +++++
.../hive/source/file/reader/type/FileTypeEnum.java | 48 +++++++
5 files changed, 251 insertions(+), 8 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-hive/pom.xml b/seatunnel-connectors-v2/connector-hive/pom.xml
index a4842e0ed..ac6596bbe 100644
--- a/seatunnel-connectors-v2/connector-hive/pom.xml
+++ b/seatunnel-connectors-v2/connector-hive/pom.xml
@@ -75,6 +75,12 @@
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index ebaf2c51b..039d9b4c7 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -34,9 +34,8 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.hive.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.OrcReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategy;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.TextReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategyFactory;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -69,12 +68,8 @@ public class HiveSource implements SeaTunnelSource<SeaTunnelRow, HiveSourceSplit
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
}
- // default filetype is text
- if ("orc".equals(pluginConfig.getString(SourceConfig.FILE_TYPE))) {
- readStrategy = new OrcReadStrategy();
- } else {
- readStrategy = new TextReadStrategy();
- }
+ // use factory to generate readStrategy
+ readStrategy = ReadStrategyFactory.of(pluginConfig.getString(SourceConfig.FILE_TYPE));
String path = pluginConfig.getString(FILE_PATH);
hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
try {
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
new file mode 100644
index 000000000..2895c933f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
+import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class ParquetReadStrategy extends AbstractReadStrategy {
+
+ private SeaTunnelRowType seaTunnelRowType;
+
+ @Override
+ public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
+ Path filePath = new Path(path);
+ HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, getConfiguration());
+ int fieldsCount = seaTunnelRowType.getTotalFields();
+ GenericRecord record;
+ try (ParquetReader<GenericData.Record> reader = AvroParquetReader.<GenericData.Record>builder(hadoopInputFile).build()) {
+ while ((record = reader.read()) != null) {
+ Object[] fields = new Object[fieldsCount];
+ for (int i = 0; i < fieldsCount; i++) {
+ Object data = record.get(i);
+ try {
+ if (data instanceof GenericData.Fixed) {
+ // judge the data in upstream is or not decimal type
+ data = fixed2String((GenericData.Fixed) data);
+ } else if (data instanceof ArrayList) {
+ // judge the data in upstream is or not array type
+ data = array2String((ArrayList<GenericData.Record>) data);
+ }
+ } catch (Exception e) {
+ data = record.get(i);
+ } finally {
+ fields[i] = data.toString();
+ }
+ }
+ output.collect(new SeaTunnelRow(fields));
+ }
+ }
+ }
+
+ @Override
+ public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws HivePluginException {
+ if (seaTunnelRowType != null) {
+ return seaTunnelRowType;
+ }
+ Configuration configuration = getConfiguration(hadoopConf);
+ Path filePath = new Path(path);
+ ParquetMetadata metadata;
+ try {
+ metadata = ParquetFileReader.readFooter(configuration, filePath);
+ } catch (IOException e) {
+ throw new HivePluginException("Create parquet reader failed", e);
+ }
+ FileMetaData fileMetaData = metadata.getFileMetaData();
+ MessageType schema = fileMetaData.getSchema();
+ int fieldCount = schema.getFieldCount();
+ String[] fields = new String[fieldCount];
+ SeaTunnelDataType[] types = new SeaTunnelDataType[fieldCount];
+ for (int i = 0; i < fieldCount; i++) {
+ fields[i] = schema.getFieldName(i);
+ // Temporarily each field is treated as a string type
+ // I think we can use the schema information to build seatunnel column type
+ types[i] = BasicType.STRING_TYPE;
+ }
+ seaTunnelRowType = new SeaTunnelRowType(fields, types);
+ return seaTunnelRowType;
+ }
+
+ private String fixed2String(GenericData.Fixed fixed) {
+ Schema schema = fixed.getSchema();
+ byte[] bytes = fixed.bytes();
+ int precision = Integer.parseInt(schema.getObjectProps().get("precision").toString());
+ int scale = Integer.parseInt(schema.getObjectProps().get("scale").toString());
+ BigDecimal bigDecimal = bytes2Decimal(bytes, precision, scale);
+ return bigDecimal.toString();
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private BigDecimal bytes2Decimal(byte[] bytesArray, int precision, int scale) {
+ Binary value = Binary.fromConstantByteArray(bytesArray);
+ if (precision <= 18) {
+ ByteBuffer buffer = value.toByteBuffer();
+ byte[] bytes = buffer.array();
+ int start = buffer.arrayOffset() + buffer.position();
+ int end = buffer.arrayOffset() + buffer.limit();
+ long unscaled = 0L;
+ int i = start;
+ while (i < end) {
+ unscaled = unscaled << 8 | bytes[i] & 0xff;
+ i++;
+ }
+ int bits = 8 * (end - start);
+ long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
+ if (unscaledNew <= -Math.pow(10, 18) || unscaledNew >= Math.pow(10, 18)) {
+ return new BigDecimal(unscaledNew);
+ } else {
+ return BigDecimal.valueOf(unscaledNew / Math.pow(10, scale));
+ }
+ } else {
+ return new BigDecimal(new BigInteger(value.getBytes()), scale);
+ }
+ }
+
+ private String array2String(ArrayList<GenericData.Record> data) throws JsonProcessingException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ List<String> values = data.stream().map(record -> record.get(0).toString()).collect(Collectors.toList());
+ return objectMapper.writeValueAsString(values);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategyFactory.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategyFactory.java
new file mode 100644
index 000000000..56e88aa44
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategyFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
+
+import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.type.FileTypeEnum;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ReadStrategyFactory {
+
+ private ReadStrategyFactory() {}
+
+ public static ReadStrategy of(String fileType) {
+ try {
+ FileTypeEnum fileTypeEnum = FileTypeEnum.valueOf(fileType.toUpperCase());
+ return fileTypeEnum.getReadStrategy();
+ } catch (IllegalArgumentException e) {
+ log.warn("Hive plugin not support this file type [{}], it will be treated as a plain text file", fileType);
+ return new TextReadStrategy();
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/type/FileTypeEnum.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/type/FileTypeEnum.java
new file mode 100644
index 000000000..3cf986b18
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/type/FileTypeEnum.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.type;
+
+import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.OrcReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ParquetReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.TextReadStrategy;
+
+public enum FileTypeEnum {
+ ORC {
+ @Override
+ public ReadStrategy getReadStrategy() {
+ return new OrcReadStrategy();
+ }
+ },
+ PARQUET {
+ @Override
+ public ReadStrategy getReadStrategy() {
+ return new ParquetReadStrategy();
+ }
+ },
+ TEXT {
+ @Override
+ public ReadStrategy getReadStrategy() {
+ return new TextReadStrategy();
+ }
+ };
+
+ public ReadStrategy getReadStrategy() {
+ return null;
+ }
+}