You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/07/28 03:00:09 UTC
[incubator-seatunnel] branch dev updated: [Connector-V2] Add file type check logic in hive connector (#2275)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5488337c6 [Connector-V2] Add file type check logic in hive connector (#2275)
5488337c6 is described below
commit 5488337c6710c43fdd1ba5e069160e918761721b
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Thu Jul 28 11:00:03 2022 +0800
[Connector-V2] Add file type check logic in hive connector (#2275)
---
.../source/file/reader/format/OrcReadStrategy.java | 45 ++++++++++++++++++++++
.../file/reader/format/ParquetReadStrategy.java | 29 ++++++++++++++
2 files changed, 74 insertions(+)
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java
index 325c1aacc..96ff1e983 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
@@ -46,6 +48,7 @@ import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Properties;
@@ -53,6 +56,7 @@ import java.util.Properties;
public class OrcReadStrategy extends AbstractReadStrategy {
private SeaTunnelRowType seaTunnelRowTypeInfo;
+ private static final long MIN_SIZE = 16 * 1024;
@Override
public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
@@ -119,5 +123,46 @@ public class OrcReadStrategy extends AbstractReadStrategy {
return seaTunnelRowTypeInfo;
}
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ boolean checkFileType(String path) {
+ try {
+ boolean checkResult;
+ Configuration configuration = getConfiguration();
+ FileSystem fileSystem = FileSystem.get(configuration);
+ Path filePath = new Path(path);
+ FSDataInputStream in = fileSystem.open(filePath);
+ // try to get Postscript in orc file
+ long size = fileSystem.getFileStatus(filePath).getLen();
+ int readSize = (int) Math.min(size, MIN_SIZE);
+ in.seek(size - readSize);
+ ByteBuffer buffer = ByteBuffer.allocate(readSize);
+ in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+ int psLen = buffer.get(readSize - 1) & 0xff;
+ int len = OrcFile.MAGIC.length();
+ if (psLen < len + 1) {
+ in.close();
+ return false;
+ }
+ int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1 - len;
+ byte[] array = buffer.array();
+ if (Text.decode(array, offset, len).equals(OrcFile.MAGIC)) {
+ checkResult = true;
+ } else {
+ // If it isn't there, this may be the 0.11.0 version of ORC.
+ // Read the first 3 bytes of the file to check for the header
+ in.seek(0);
+ byte[] header = new byte[len];
+ in.readFully(header, 0, len);
+ // if it isn't there, this isn't an ORC file
+ checkResult = Text.decode(header, 0, len).equals(OrcFile.MAGIC);
+ }
+ in.close();
+ return checkResult;
+ } catch (HivePluginException | IOException e) {
+ String errorMsg = String.format("Check orc file [%s] error", path);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
}
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
index 2895c933f..35d70e3ea 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
@@ -32,6 +32,8 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -47,6 +49,7 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -55,8 +58,13 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
private SeaTunnelRowType seaTunnelRowType;
+ private static final byte[] PARQUET_MAGIC = new byte[]{(byte) 'P', (byte) 'A', (byte) 'R', (byte) '1'};
+
@Override
public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
+ if (Boolean.FALSE.equals(checkFileType(path))) {
+ throw new Exception("please check file type");
+ }
Path filePath = new Path(path);
HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, getConfiguration());
int fieldsCount = seaTunnelRowType.getTotalFields();
@@ -148,6 +156,27 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
}
}
+ @Override
+ boolean checkFileType(String path) {
+ boolean checkResult;
+ byte[] magic = new byte[PARQUET_MAGIC.length];
+ try {
+ Configuration configuration = getConfiguration();
+ FileSystem fileSystem = FileSystem.get(configuration);
+ Path filePath = new Path(path);
+ FSDataInputStream in = fileSystem.open(filePath);
+ // try to get header information in a parquet file
+ in.seek(0);
+ in.readFully(magic);
+ checkResult = Arrays.equals(magic, PARQUET_MAGIC);
+ in.close();
+ return checkResult;
+ } catch (HivePluginException | IOException e) {
+ String errorMsg = String.format("Check parquet file [%s] error", path);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
private String array2String(ArrayList<GenericData.Record> data) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
List<String> values = data.stream().map(record -> record.get(0).toString()).collect(Collectors.toList());