You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/07/28 03:00:09 UTC

[incubator-seatunnel] branch dev updated: [Connector-V2] Add file type check logic in hive connector (#2275)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5488337c6 [Connector-V2] Add file type check logic in hive connector (#2275)
5488337c6 is described below

commit 5488337c6710c43fdd1ba5e069160e918761721b
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Thu Jul 28 11:00:03 2022 +0800

    [Connector-V2] Add file type check logic in hive connector (#2275)
---
 .../source/file/reader/format/OrcReadStrategy.java | 45 ++++++++++++++++++++++
 .../file/reader/format/ParquetReadStrategy.java    | 29 ++++++++++++++
 2 files changed, 74 insertions(+)

diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java
index 325c1aacc..96ff1e983 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -46,6 +48,7 @@ import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Properties;
 
@@ -53,6 +56,7 @@ import java.util.Properties;
 public class OrcReadStrategy extends AbstractReadStrategy {
 
     private SeaTunnelRowType seaTunnelRowTypeInfo;
+    private static final long MIN_SIZE = 16 * 1024;
 
     @Override
     public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
@@ -119,5 +123,46 @@ public class OrcReadStrategy extends AbstractReadStrategy {
         return seaTunnelRowTypeInfo;
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    boolean checkFileType(String path) {
+        try {
+            boolean checkResult;
+            Configuration configuration = getConfiguration();
+            FileSystem fileSystem = FileSystem.get(configuration);
+            Path filePath = new Path(path);
+            FSDataInputStream in = fileSystem.open(filePath);
+            // try to get Postscript in orc file
+            long size = fileSystem.getFileStatus(filePath).getLen();
+            int readSize = (int) Math.min(size, MIN_SIZE);
+            in.seek(size - readSize);
+            ByteBuffer buffer = ByteBuffer.allocate(readSize);
+            in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+            int psLen = buffer.get(readSize - 1) & 0xff;
+            int len = OrcFile.MAGIC.length();
+            if (psLen < len + 1) {
+                in.close();
+                return false;
+            }
+            int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1 - len;
+            byte[] array = buffer.array();
+            if (Text.decode(array, offset, len).equals(OrcFile.MAGIC)) {
+                checkResult = true;
+            } else {
+                // If it isn't there, this may be the 0.11.0 version of ORC.
+                // Read the first 3 bytes of the file to check for the header
+                in.seek(0);
+                byte[] header = new byte[len];
+                in.readFully(header, 0, len);
+                // if it isn't there, this isn't an ORC file
+                checkResult = Text.decode(header, 0, len).equals(OrcFile.MAGIC);
+            }
+            in.close();
+            return checkResult;
+        } catch (HivePluginException | IOException e) {
+            String errorMsg = String.format("Check orc file [%s] error", path);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
 }
 
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
index 2895c933f..35d70e3ea 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
@@ -32,6 +32,8 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -47,6 +49,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -55,8 +58,13 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
 
     private SeaTunnelRowType seaTunnelRowType;
 
+    private static final byte[] PARQUET_MAGIC = new byte[]{(byte) 'P', (byte) 'A', (byte) 'R', (byte) '1'};
+
     @Override
     public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
+        if (Boolean.FALSE.equals(checkFileType(path))) {
+            throw new Exception("please check file type");
+        }
         Path filePath = new Path(path);
         HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, getConfiguration());
         int fieldsCount = seaTunnelRowType.getTotalFields();
@@ -148,6 +156,27 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
         }
     }
 
+    @Override
+    boolean checkFileType(String path) {
+        boolean checkResult;
+        byte[] magic = new byte[PARQUET_MAGIC.length];
+        try {
+            Configuration configuration = getConfiguration();
+            FileSystem fileSystem = FileSystem.get(configuration);
+            Path filePath = new Path(path);
+            FSDataInputStream in = fileSystem.open(filePath);
+            // try to get header information in a parquet file
+            in.seek(0);
+            in.readFully(magic);
+            checkResult = Arrays.equals(magic, PARQUET_MAGIC);
+            in.close();
+            return checkResult;
+        } catch (HivePluginException | IOException e) {
+            String errorMsg = String.format("Check parquet file [%s] error", path);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
+
     private String array2String(ArrayList<GenericData.Record> data) throws JsonProcessingException {
         ObjectMapper objectMapper = new ObjectMapper();
         List<String> values = data.stream().map(record -> record.get(0).toString()).collect(Collectors.toList());