You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/07/27 02:00:46 UTC

[incubator-seatunnel] branch dev updated: [Connector-V2] Add parquet file reader for Hive Source Connector (#2199) (#2237)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 59db97ed3 [Connector-V2] Add parquet file reader for Hive Source Connector (#2199) (#2237)
59db97ed3 is described below

commit 59db97ed3460819145f32737342cb31dea6188db
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Wed Jul 27 10:00:40 2022 +0800

    [Connector-V2] Add parquet file reader for Hive Source Connector (#2199) (#2237)
    
    * [Connector-V2][connector-hive] Add parquet read strategy in hive connector source
    
    * [Connector-V2][connector-hive] Optimize the process of generating file read strategies
---
 seatunnel-connectors-v2/connector-hive/pom.xml     |   6 +
 .../seatunnel/hive/source/HiveSource.java          |  11 +-
 .../file/reader/format/ParquetReadStrategy.java    | 156 +++++++++++++++++++++
 .../file/reader/format/ReadStrategyFactory.java    |  38 +++++
 .../hive/source/file/reader/type/FileTypeEnum.java |  48 +++++++
 5 files changed, 251 insertions(+), 8 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-hive/pom.xml b/seatunnel-connectors-v2/connector-hive/pom.xml
index a4842e0ed..ac6596bbe 100644
--- a/seatunnel-connectors-v2/connector-hive/pom.xml
+++ b/seatunnel-connectors-v2/connector-hive/pom.xml
@@ -75,6 +75,12 @@
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-collections4</artifactId>
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index ebaf2c51b..039d9b4c7 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -34,9 +34,8 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.SourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.OrcReadStrategy;
 import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategy;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.TextReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategyFactory;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -69,12 +68,8 @@ public class HiveSource implements SeaTunnelSource<SeaTunnelRow, HiveSourceSplit
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
         }
-        // default filetype is text
-        if ("orc".equals(pluginConfig.getString(SourceConfig.FILE_TYPE))) {
-            readStrategy = new OrcReadStrategy();
-        } else {
-            readStrategy = new TextReadStrategy();
-        }
+        // use factory to generate readStrategy
+        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(SourceConfig.FILE_TYPE));
         String path = pluginConfig.getString(FILE_PATH);
         hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
         try {
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
new file mode 100644
index 000000000..2895c933f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
+import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class ParquetReadStrategy extends AbstractReadStrategy {
+
+    private SeaTunnelRowType seaTunnelRowType;
+
+    @Override
+    public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
+        Path filePath = new Path(path);
+        HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, getConfiguration());
+        int fieldsCount = seaTunnelRowType.getTotalFields();
+        GenericRecord record;
+        try (ParquetReader<GenericData.Record> reader = AvroParquetReader.<GenericData.Record>builder(hadoopInputFile).build()) {
+            while ((record = reader.read()) != null) {
+                Object[] fields = new Object[fieldsCount];
+                for (int i = 0; i < fieldsCount; i++) {
+                    Object data = record.get(i);
+                    try {
+                        if (data instanceof GenericData.Fixed) {
+                            // judge the data in upstream is or not decimal type
+                            data = fixed2String((GenericData.Fixed) data);
+                        } else if (data instanceof ArrayList) {
+                            // judge the data in upstream is or not array type
+                            data = array2String((ArrayList<GenericData.Record>) data);
+                        }
+                    } catch (Exception e) {
+                        data = record.get(i);
+                    } finally {
+                        fields[i] = data.toString();
+                    }
+                }
+                output.collect(new SeaTunnelRow(fields));
+            }
+        }
+    }
+
+    @Override
+    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws HivePluginException {
+        if (seaTunnelRowType != null) {
+            return seaTunnelRowType;
+        }
+        Configuration configuration = getConfiguration(hadoopConf);
+        Path filePath = new Path(path);
+        ParquetMetadata metadata;
+        try {
+            metadata = ParquetFileReader.readFooter(configuration, filePath);
+        } catch (IOException e) {
+            throw new HivePluginException("Create parquet reader failed", e);
+        }
+        FileMetaData fileMetaData = metadata.getFileMetaData();
+        MessageType schema = fileMetaData.getSchema();
+        int fieldCount = schema.getFieldCount();
+        String[] fields = new String[fieldCount];
+        SeaTunnelDataType[] types = new SeaTunnelDataType[fieldCount];
+        for (int i = 0; i < fieldCount; i++) {
+            fields[i] = schema.getFieldName(i);
+            // Temporarily each field is treated as a string type
+            // I think we can use the schema information to build seatunnel column type
+            types[i] = BasicType.STRING_TYPE;
+        }
+        seaTunnelRowType = new SeaTunnelRowType(fields, types);
+        return seaTunnelRowType;
+    }
+
+    private String fixed2String(GenericData.Fixed fixed) {
+        Schema schema = fixed.getSchema();
+        byte[] bytes = fixed.bytes();
+        int precision = Integer.parseInt(schema.getObjectProps().get("precision").toString());
+        int scale = Integer.parseInt(schema.getObjectProps().get("scale").toString());
+        BigDecimal bigDecimal = bytes2Decimal(bytes, precision, scale);
+        return bigDecimal.toString();
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private BigDecimal bytes2Decimal(byte[] bytesArray, int precision, int scale) {
+        Binary value = Binary.fromConstantByteArray(bytesArray);
+        if (precision <= 18) {
+            ByteBuffer buffer = value.toByteBuffer();
+            byte[] bytes = buffer.array();
+            int start = buffer.arrayOffset() + buffer.position();
+            int end = buffer.arrayOffset() + buffer.limit();
+            long unscaled = 0L;
+            int i = start;
+            while (i < end) {
+                unscaled = unscaled << 8 | bytes[i] & 0xff;
+                i++;
+            }
+            int bits = 8 * (end - start);
+            long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
+            if (unscaledNew <= -Math.pow(10, 18) || unscaledNew >= Math.pow(10, 18)) {
+                return new BigDecimal(unscaledNew);
+            } else {
+                return BigDecimal.valueOf(unscaledNew / Math.pow(10, scale));
+            }
+        } else {
+            return new BigDecimal(new BigInteger(value.getBytes()), scale);
+        }
+    }
+
+    private String array2String(ArrayList<GenericData.Record> data) throws JsonProcessingException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        List<String> values = data.stream().map(record -> record.get(0).toString()).collect(Collectors.toList());
+        return objectMapper.writeValueAsString(values);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategyFactory.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategyFactory.java
new file mode 100644
index 000000000..56e88aa44
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategyFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
+
+import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.type.FileTypeEnum;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ReadStrategyFactory {
+
+    private ReadStrategyFactory() {}
+
+    public static ReadStrategy of(String fileType) {
+        try {
+            FileTypeEnum fileTypeEnum = FileTypeEnum.valueOf(fileType.toUpperCase());
+            return fileTypeEnum.getReadStrategy();
+        } catch (IllegalArgumentException e) {
+            log.warn("Hive plugin not support this file type [{}], it will be treated as a plain text file", fileType);
+            return new TextReadStrategy();
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/type/FileTypeEnum.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/type/FileTypeEnum.java
new file mode 100644
index 000000000..3cf986b18
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/type/FileTypeEnum.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.type;
+
+import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.OrcReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ParquetReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.TextReadStrategy;
+
+public enum FileTypeEnum {
+    ORC {
+        @Override
+        public ReadStrategy getReadStrategy() {
+            return new OrcReadStrategy();
+        }
+    },
+    PARQUET {
+        @Override
+        public ReadStrategy getReadStrategy() {
+            return new ParquetReadStrategy();
+        }
+    },
+    TEXT {
+        @Override
+        public ReadStrategy getReadStrategy() {
+            return new TextReadStrategy();
+        }
+    };
+
+    public ReadStrategy getReadStrategy() {
+        return null;
+    }
+}