You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/06 09:28:37 UTC
[flink-table-store] branch master updated: [FLINK-30582] Flink-avro Flink-orc free for flink-table-store-format
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new b3474aff [FLINK-30582] Flink-avro Flink-orc free for flink-table-store-format
b3474aff is described below
commit b3474aff36a661677e22d56e627ef227390c2689
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Jan 6 17:28:32 2023 +0800
[FLINK-30582] Flink-avro Flink-orc free for flink-table-store-format
This closes #461
---
.../flink-table-store-micro-benchmarks/pom.xml | 6 -
.../apache/flink/table/store/utils/MapBuilder.java | 38 ++
flink-table-store-format/pom.xml | 168 ++---
.../store/format/avro/AbstractAvroBulkFormat.java | 1 -
.../flink/table/store/format/avro/AvroBuilder.java | 33 +
.../table/store/format/avro/AvroBulkWriter.java | 56 ++
.../table/store/format/avro/AvroFileFormat.java | 19 +-
.../store/format/avro/AvroSchemaConverter.java | 216 +++++++
.../store/format/avro/AvroToRowDataConverters.java | 262 ++++++++
.../table/store/format/avro/AvroWriterFactory.java | 49 ++
.../store/format/avro/CloseShieldOutputStream.java | 56 ++
.../format/avro/FSDataInputStreamWrapper.java | 67 ++
.../table/store/format/avro/JodaConverter.java | 70 +++
.../store/format/avro/RowDataToAvroConverters.java | 311 ++++++++++
.../table/store/format/orc/OrcFileFormat.java | 33 +-
.../store/format/orc/OrcInputFormatFactory.java | 104 ----
.../table/store/format/orc/OrcReaderFactory.java | 403 ++++++++++++
.../flink/table/store/format/orc/OrcShimImpl.java | 163 -----
.../table/store/format/orc/OrcWriterFactory.java | 125 ++++
.../orc/SerializableHadoopConfigWrapper.java | 78 +++
.../orc/ThreadLocalClassLoaderConfiguration.java | 58 ++
.../orc/{ => filter}/OrcFileStatsExtractor.java | 5 +-
.../table/store/format/orc/filter/OrcFilters.java | 685 +++++++++++++++++++++
.../{ => filter}/OrcPredicateFunctionVisitor.java | 3 +-
.../format/orc/reader/AbstractOrcColumnVector.java | 79 +++
.../format/orc/reader/OrcArrayColumnVector.java | 47 ++
.../format/orc/reader/OrcBytesColumnVector.java | 42 ++
.../format/orc/reader/OrcDecimalColumnVector.java | 46 ++
.../format/orc/reader/OrcDoubleColumnVector.java | 47 ++
.../orc/reader/OrcLegacyTimestampColumnVector.java | 91 +++
.../format/orc/reader/OrcLongColumnVector.java | 65 ++
.../format/orc/reader/OrcMapColumnVector.java | 49 ++
.../format/orc/reader/OrcRowColumnVector.java | 49 ++
.../format/orc/reader/OrcSplitReaderUtil.java | 99 +++
.../orc/reader/OrcTimestampColumnVector.java | 49 ++
.../store/format/orc/writer/OrcBulkWriter.java | 75 +++
.../format/orc/writer/PhysicalWriterImpl.java | 398 ++++++++++++
.../store/format/orc/writer/RowDataVectorizer.java | 277 +++++++++
.../table/store/format/orc/writer/Vectorizer.java | 96 +++
.../src/main/resources/META-INF/NOTICE | 6 +
.../store/format/avro/AvroBulkFormatTest.java | 284 +++++++++
.../store/format/avro/AvroBulkFormatTestUtils.java | 70 +++
.../store/format/orc/OrcBulkWriterTestUtil.java | 96 +++
.../format/orc/OrcFileStatsExtractorTest.java | 1 +
.../store/format/orc/OrcFilterConverterTest.java | 3 +-
.../store/format/orc/OrcReaderFactoryTest.java | 291 +++++++++
.../store/format/orc/OrcSplitReaderUtilTest.java | 67 ++
.../store/format/orc/OrcWriterFactoryTest.java | 89 +++
.../flink/table/store/format/orc/Record.java | 55 ++
.../table/store/format/orc/RecordVectorizer.java | 55 ++
.../src/test/resources/test-data-decimal.orc | Bin 0 -> 16337 bytes
.../src/test/resources/test-data-flat.orc | Bin 0 -> 408522 bytes
.../flink-table-store-hive-catalog/pom.xml | 13 -
.../flink/table/store/spark/SparkS3ITCase.java | 28 +-
pom.xml | 8 -
55 files changed, 5177 insertions(+), 407 deletions(-)
diff --git a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml
index db990385..67ba847c 100644
--- a/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml
+++ b/flink-table-store-benchmark/flink-table-store-micro-benchmarks/pom.xml
@@ -131,12 +131,6 @@ under the License.
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>${flink.sql.parquet}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
</dependencies>
<!-- This is copied from flink-benchmarks and updated for flink-table-store-micro-benchmarks. -->
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MapBuilder.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MapBuilder.java
new file mode 100644
index 00000000..dc92f4d9
--- /dev/null
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MapBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Build for {@link Map}. */
+public class MapBuilder<K, V> {
+
+ private final Map<K, V> map = new HashMap<>();
+
+ public MapBuilder<K, V> put(K k, V v) {
+ map.put(k, v);
+ return this;
+ }
+
+ public Map<K, V> unmodifiable() {
+ return Collections.unmodifiableMap(map);
+ }
+}
diff --git a/flink-table-store-format/pom.xml b/flink-table-store-format/pom.xml
index f548e487..6f256d0a 100644
--- a/flink-table-store-format/pom.xml
+++ b/flink-table-store-format/pom.xml
@@ -33,6 +33,7 @@ under the License.
<properties>
<parquet.version>1.12.3</parquet.version>
+ <orc.version>1.5.6</orc.version>
</properties>
<dependencies>
@@ -59,74 +60,91 @@ under the License.
<scope>provided</scope>
</dependency>
- <!-- format dependencies -->
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
- <version>${flink.version}</version>
- </dependency>
+ <!-- Hadoop -->
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-sql-avro</artifactId>
- <version>${flink.version}</version>
- <scope>runtime</scope>
- <optional>true</optional>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
<exclusions>
<exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>${flink.orc}</artifactId>
- <version>${flink.version}</version>
- </dependency>
+ <!-- Orc Start -->
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>${flink.sql.orc}</artifactId>
- <version>${flink.version}</version>
- <scope>runtime</scope>
- <optional>true</optional>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ <version>${orc.version}</version>
<exclusions>
+ <!-- Exclude ORC's Hadoop dependency and pull in provided vanilla hadoop. -->
<exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
</exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
<exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
+ <groupId>javax.activation</groupId>
+ <artifactId>javax.activation-api</artifactId>
</exclusion>
<exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.12.0</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Orc End -->
+
+ <!-- Avro Start -->
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <scope>provided</scope>
+ <!-- Avro records can contain JodaTime fields when using logical fields.
+ In order to handle them, we need to add an optional dependency.
+ Users with those Avro records need to add this dependency themselves. -->
+ <optional>true</optional>
+ <version>2.5</version>
+ </dependency>
+
+ <!-- Avro End -->
+
<!-- Parquet Start -->
<dependency>
@@ -254,8 +272,20 @@ under the License.
<configuration>
<artifactSet>
<includes combine.children="append">
- <include>org.apache.flink:flink-sql-avro</include>
- <include>org.apache.flink:${flink.sql.orc}</include>
+ <!-- Orc -->
+ <include>org.apache.orc:orc-core</include>
+ <include>org.apache.orc:orc-shims</include>
+ <include>org.apache.hive:hive-storage-api</include>
+ <include>io.airlift:aircompressor</include>
+ <include>commons-lang:commons-lang</include>
+ <include>com.google.protobuf:protobuf-java</include>
+
+ <!-- Avro -->
+ <include>org.apache.avro:avro</include>
+ <include>com.fasterxml.jackson.core:jackson-core</include>
+ <include>com.fasterxml.jackson.core:jackson-databind</include>
+ <include>com.fasterxml.jackson.core:jackson-annotations</include>
+ <include>org.apache.commons:commons-compress</include>
<!-- Parquet -->
<include>org.apache.parquet:parquet-avro</include>
@@ -270,23 +300,6 @@ under the License.
</includes>
</artifactSet>
<filters>
- <!--
- Throw away all META-INF/services,
- otherwise if user has the same format/connector jar in the classpath,
- FactoryUtil will complain about multiple matching factories.
- -->
- <filter>
- <artifact>org.apache.flink:flink-sql-avro</artifact>
- <excludes>
- <exclude>META-INF/services/**</exclude>
- </excludes>
- </filter>
- <filter>
- <artifact>org.apache.flink:${flink.sql.orc}</artifact>
- <excludes>
- <exclude>META-INF/services/**</exclude>
- </excludes>
- </filter>
<!-- Another copy of the Apache license, which we don't need. -->
<filter>
<artifact>*</artifact>
@@ -296,22 +309,7 @@ under the License.
</filter>
</filters>
<relocations>
- <relocation>
- <pattern>org.apache.avro</pattern>
- <shadedPattern>org.apache.flink.avro.shaded.org.apache.avro</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.flink.formats.avro</pattern>
- <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.formats.avro</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.flink.orc</pattern>
- <shadedPattern>org.apache.flink.table.store.shaded.org.apache.flink.orc</shadedPattern>
- </relocation>
- <!--
- flink-sql-orc module does not shade its dependencies, so we shade here.
- See maven-shade-plugin usage of flink-sql-orc for detailed dependency list.
- -->
+ <!-- Relocate Orc -->
<relocation>
<pattern>org.apache.orc</pattern>
<shadedPattern>org.apache.flink.table.store.shaded.org.apache.orc</shadedPattern>
@@ -333,19 +331,31 @@ under the License.
<shadedPattern>org.apache.flink.table.store.shaded.com.google.protobuf</shadedPattern>
</relocation>
+ <!-- Relocate Avro. -->
+ <relocation>
+ <pattern>org.apache.avro</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.avro</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.fasterxml.jackson</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.com.fasterxml.jackson</shadedPattern>
+ </relocation>
+
<!-- Relocate parquet. -->
<relocation>
<pattern>org.apache.parquet</pattern>
<shadedPattern>org.apache.flink.table.store.shaded.org.apache.parquet</shadedPattern>
</relocation>
- <relocation>
- <pattern>org.apache.commons</pattern>
- <shadedPattern>org.apache.flink.table.store.shaded.org.apache.commons</shadedPattern>
- </relocation>
<relocation>
<pattern>shaded.parquet</pattern>
<shadedPattern>org.apache.flink.table.store.shaded.parquet</shadedPattern>
</relocation>
+
+ <!-- Relocate Common. -->
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.commons</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AbstractAvroBulkFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AbstractAvroBulkFormat.java
index 66ba335b..ef232179 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AbstractAvroBulkFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AbstractAvroBulkFormat.java
@@ -27,7 +27,6 @@ import org.apache.flink.connector.file.src.util.IteratorResultIterator;
import org.apache.flink.connector.file.src.util.Pool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.formats.avro.utils.FSDataInputStreamWrapper;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroBuilder.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroBuilder.java
new file mode 100644
index 00000000..9e1a18b3
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroBuilder.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.avro.file.DataFileWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/** A builder to create an {@link AvroBulkWriter} from an {@link OutputStream}. */
+@FunctionalInterface
+public interface AvroBuilder<T> extends Serializable {
+
+ /** Creates and configures an Avro writer to the given output file. */
+ DataFileWriter<T> createWriter(OutputStream outputStream) throws IOException;
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroBulkWriter.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroBulkWriter.java
new file mode 100644
index 00000000..f159d413
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroBulkWriter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+
+import org.apache.avro.file.DataFileWriter;
+
+import java.io.IOException;
+
+/** A simple {@link BulkWriter} implementation that wraps an Avro {@link DataFileWriter}. */
+public class AvroBulkWriter<T> implements BulkWriter<T> {
+
+ /** The underlying Avro writer. */
+ private final DataFileWriter<T> dataFileWriter;
+
+ /**
+ * Create a new AvroBulkWriter wrapping the given Avro {@link DataFileWriter}.
+ *
+ * @param dataFileWriter The underlying Avro writer.
+ */
+ public AvroBulkWriter(DataFileWriter<T> dataFileWriter) {
+ this.dataFileWriter = dataFileWriter;
+ }
+
+ @Override
+ public void addElement(T element) throws IOException {
+ dataFileWriter.append(element);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ dataFileWriter.flush();
+ }
+
+ @Override
+ public void finish() throws IOException {
+ dataFileWriter.close();
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
index f908fae2..2b3e413c 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java
@@ -20,15 +20,12 @@ package org.apache.flink.table.store.format.avro;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.formats.avro.AvroBuilder;
-import org.apache.flink.formats.avro.AvroToRowDataConverters;
-import org.apache.flink.formats.avro.AvroWriterFactory;
-import org.apache.flink.formats.avro.RowDataToAvroConverters;
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -53,15 +50,23 @@ import java.io.OutputStream;
import java.util.List;
import java.util.function.Function;
-import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC;
+import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
/** Avro {@link FileFormat}. The main code is copied from Flink {@code AvroFileFormatFactory}. */
public class AvroFileFormat extends FileFormat {
+ public static final String IDENTIFIER = "avro";
+
+ public static final ConfigOption<String> AVRO_OUTPUT_CODEC =
+ ConfigOptions.key("codec")
+ .stringType()
+ .defaultValue(SNAPPY_CODEC)
+ .withDescription("The compression codec for avro");
+
private final ReadableConfig formatOptions;
public AvroFileFormat(ReadableConfig formatOptions) {
- super(org.apache.flink.formats.avro.AvroFileFormatFactory.IDENTIFIER);
+ super(IDENTIFIER);
this.formatOptions = formatOptions;
}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java
new file mode 100644
index 00000000..73a9c37b
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroSchemaConverter.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+
+import java.util.List;
+
+/**
+ * Converts an Avro schema into Flink's type information. It uses {@link RowTypeInfo} for
+ * representing objects and converts Avro types into types that are compatible with Flink's Table &
+ * SQL API.
+ */
+public class AvroSchemaConverter {
+
+ private AvroSchemaConverter() {
+ // private
+ }
+
+ /**
+ * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
+ *
+ * <p>Use "org.apache.flink.avro.generated.record" as the type name.
+ *
+ * @param schema the schema type, usually it should be the top level record type, e.g. not a
+ * nested type
+ * @return Avro's {@link Schema} matching this logical type.
+ */
+ public static Schema convertToSchema(LogicalType schema) {
+ return convertToSchema(schema, "org.apache.flink.avro.generated.record");
+ }
+
+ /**
+ * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
+ *
+ * <p>The "{rowName}_" is used as the nested row type name prefix in order to generate the right
+ * schema. Nested record type that only differs with type name is still compatible.
+ *
+ * @param logicalType logical type
+ * @param rowName the record name
+ * @return Avro's {@link Schema} matching this logical type.
+ */
+ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
+ int precision;
+ boolean nullable = logicalType.isNullable();
+ switch (logicalType.getTypeRoot()) {
+ case NULL:
+ return SchemaBuilder.builder().nullType();
+ case BOOLEAN:
+ Schema bool = SchemaBuilder.builder().booleanType();
+ return nullable ? nullableSchema(bool) : bool;
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ Schema integer = SchemaBuilder.builder().intType();
+ return nullable ? nullableSchema(integer) : integer;
+ case BIGINT:
+ Schema bigint = SchemaBuilder.builder().longType();
+ return nullable ? nullableSchema(bigint) : bigint;
+ case FLOAT:
+ Schema f = SchemaBuilder.builder().floatType();
+ return nullable ? nullableSchema(f) : f;
+ case DOUBLE:
+ Schema d = SchemaBuilder.builder().doubleType();
+ return nullable ? nullableSchema(d) : d;
+ case CHAR:
+ case VARCHAR:
+ Schema str = SchemaBuilder.builder().stringType();
+ return nullable ? nullableSchema(str) : str;
+ case BINARY:
+ case VARBINARY:
+ Schema binary = SchemaBuilder.builder().bytesType();
+ return nullable ? nullableSchema(binary) : binary;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ // use long to represents Timestamp
+ final TimestampType timestampType = (TimestampType) logicalType;
+ precision = timestampType.getPrecision();
+ org.apache.avro.LogicalType avroLogicalType;
+ if (precision <= 3) {
+ avroLogicalType = LogicalTypes.timestampMillis();
+ } else {
+ throw new IllegalArgumentException(
+ "Avro does not support TIMESTAMP type "
+ + "with precision: "
+ + precision
+ + ", it only supports precision less than 3.");
+ }
+ Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+ return nullable ? nullableSchema(timestamp) : timestamp;
+ case DATE:
+ // use int to represents Date
+ Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+ return nullable ? nullableSchema(date) : date;
+ case TIME_WITHOUT_TIME_ZONE:
+ precision = ((TimeType) logicalType).getPrecision();
+ if (precision > 3) {
+ throw new IllegalArgumentException(
+ "Avro does not support TIME type with precision: "
+ + precision
+ + ", it only supports precision less than 3.");
+ }
+ // use int to represents Time, we only support millisecond when deserialization
+ Schema time =
+ LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+ return nullable ? nullableSchema(time) : time;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) logicalType;
+ // store BigDecimal as byte[]
+ Schema decimal =
+ LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale())
+ .addToSchema(SchemaBuilder.builder().bytesType());
+ return nullable ? nullableSchema(decimal) : decimal;
+ case ROW:
+ RowType rowType = (RowType) logicalType;
+ List<String> fieldNames = rowType.getFieldNames();
+ // we have to make sure the record name is different in a Schema
+ SchemaBuilder.FieldAssembler<Schema> builder =
+ SchemaBuilder.builder().record(rowName).fields();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ String fieldName = fieldNames.get(i);
+ LogicalType fieldType = rowType.getTypeAt(i);
+ SchemaBuilder.GenericDefault<Schema> fieldBuilder =
+ builder.name(fieldName)
+ .type(convertToSchema(fieldType, rowName + "_" + fieldName));
+
+ if (fieldType.isNullable()) {
+ builder = fieldBuilder.withDefault(null);
+ } else {
+ builder = fieldBuilder.noDefault();
+ }
+ }
+ Schema record = builder.endRecord();
+ return nullable ? nullableSchema(record) : record;
+ case MULTISET:
+ case MAP:
+ Schema map =
+ SchemaBuilder.builder()
+ .map()
+ .values(
+ convertToSchema(
+ extractValueTypeToAvroMap(logicalType), rowName));
+ return nullable ? nullableSchema(map) : map;
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) logicalType;
+ Schema array =
+ SchemaBuilder.builder()
+ .array()
+ .items(convertToSchema(arrayType.getElementType(), rowName));
+ return nullable ? nullableSchema(array) : array;
+ case RAW:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported to derive Schema for type: " + logicalType);
+ }
+ }
+
+ public static LogicalType extractValueTypeToAvroMap(LogicalType type) {
+ LogicalType keyType;
+ LogicalType valueType;
+ if (type instanceof MapType) {
+ MapType mapType = (MapType) type;
+ keyType = mapType.getKeyType();
+ valueType = mapType.getValueType();
+ } else {
+ MultisetType multisetType = (MultisetType) type;
+ keyType = multisetType.getElementType();
+ valueType = new IntType();
+ }
+ if (keyType.getTypeRoot() != LogicalTypeRoot.VARCHAR
+ && keyType.getTypeRoot() != LogicalTypeRoot.CHAR) {
+ throw new UnsupportedOperationException(
+ "Avro format doesn't support non-string as key type of map. "
+ + "The key type is: "
+ + keyType.asSummaryString());
+ }
+ return valueType;
+ }
+
+ /** Returns schema with nullable true. */
+ private static Schema nullableSchema(Schema schema) {
+ return schema.isNullable()
+ ? schema
+ : Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroToRowDataConverters.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroToRowDataConverters.java
new file mode 100644
index 00000000..ad6816ab
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroToRowDataConverters.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.store.format.avro.AvroSchemaConverter.extractValueTypeToAvroMap;
+
+/** Tool class used to convert from Avro {@link GenericRecord} to {@link RowData}. * */
+@Internal
+public class AvroToRowDataConverters {
+
+ /**
+ * Runtime converter that converts Avro data structures into objects of Flink Table & SQL
+ * internal data structures.
+ */
+ @FunctionalInterface
+ public interface AvroToRowDataConverter extends Serializable {
+ Object convert(Object object);
+ }
+
+ // -------------------------------------------------------------------------------------
+ // Runtime Converters
+ // -------------------------------------------------------------------------------------
+
+ public static AvroToRowDataConverter createRowConverter(RowType rowType) {
+ final AvroToRowDataConverter[] fieldConverters =
+ rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .map(AvroToRowDataConverters::createNullableConverter)
+ .toArray(AvroToRowDataConverter[]::new);
+ final int arity = rowType.getFieldCount();
+
+ return avroObject -> {
+ IndexedRecord record = (IndexedRecord) avroObject;
+ GenericRowData row = new GenericRowData(arity);
+ for (int i = 0; i < arity; ++i) {
+ // avro always deserialize successfully even though the type isn't matched
+ // so no need to throw exception about which field can't be deserialized
+ row.setField(i, fieldConverters[i].convert(record.get(i)));
+ }
+ return row;
+ };
+ }
+
+ /** Creates a runtime converter which is null safe. */
+ private static AvroToRowDataConverter createNullableConverter(LogicalType type) {
+ final AvroToRowDataConverter converter = createConverter(type);
+ return avroObject -> {
+ if (avroObject == null) {
+ return null;
+ }
+ return converter.convert(avroObject);
+ };
+ }
+
+ /** Creates a runtime converter which assuming input object is not null. */
+ private static AvroToRowDataConverter createConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return avroObject -> null;
+ case TINYINT:
+ return avroObject -> ((Integer) avroObject).byteValue();
+ case SMALLINT:
+ return avroObject -> ((Integer) avroObject).shortValue();
+ case BOOLEAN: // boolean
+ case INTEGER: // int
+ case INTERVAL_YEAR_MONTH: // long
+ case BIGINT: // long
+ case INTERVAL_DAY_TIME: // long
+ case FLOAT: // float
+ case DOUBLE: // double
+ return avroObject -> avroObject;
+ case DATE:
+ return AvroToRowDataConverters::convertToDate;
+ case TIME_WITHOUT_TIME_ZONE:
+ return AvroToRowDataConverters::convertToTime;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return AvroToRowDataConverters::convertToTimestamp;
+ case CHAR:
+ case VARCHAR:
+ return avroObject -> StringData.fromString(avroObject.toString());
+ case BINARY:
+ case VARBINARY:
+ return AvroToRowDataConverters::convertToBytes;
+ case DECIMAL:
+ return createDecimalConverter((DecimalType) type);
+ case ARRAY:
+ return createArrayConverter((ArrayType) type);
+ case ROW:
+ return createRowConverter((RowType) type);
+ case MAP:
+ case MULTISET:
+ return createMapConverter(type);
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ private static AvroToRowDataConverter createDecimalConverter(DecimalType decimalType) {
+ final int precision = decimalType.getPrecision();
+ final int scale = decimalType.getScale();
+ return avroObject -> {
+ final byte[] bytes;
+ if (avroObject instanceof GenericFixed) {
+ bytes = ((GenericFixed) avroObject).bytes();
+ } else if (avroObject instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) avroObject;
+ bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ } else {
+ bytes = (byte[]) avroObject;
+ }
+ return DecimalData.fromUnscaledBytes(bytes, precision, scale);
+ };
+ }
+
+ private static AvroToRowDataConverter createArrayConverter(ArrayType arrayType) {
+ final AvroToRowDataConverter elementConverter =
+ createNullableConverter(arrayType.getElementType());
+ final Class<?> elementClass =
+ LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+
+ return avroObject -> {
+ final List<?> list = (List<?>) avroObject;
+ final int length = list.size();
+ final Object[] array = (Object[]) Array.newInstance(elementClass, length);
+ for (int i = 0; i < length; ++i) {
+ array[i] = elementConverter.convert(list.get(i));
+ }
+ return new GenericArrayData(array);
+ };
+ }
+
+ private static AvroToRowDataConverter createMapConverter(LogicalType type) {
+ final AvroToRowDataConverter keyConverter =
+ createConverter(DataTypes.STRING().getLogicalType());
+ final AvroToRowDataConverter valueConverter =
+ createNullableConverter(extractValueTypeToAvroMap(type));
+
+ return avroObject -> {
+ final Map<?, ?> map = (Map<?, ?>) avroObject;
+ Map<Object, Object> result = new HashMap<>();
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ Object key = keyConverter.convert(entry.getKey());
+ Object value = valueConverter.convert(entry.getValue());
+ result.put(key, value);
+ }
+ return new GenericMapData(result);
+ };
+ }
+
+ private static TimestampData convertToTimestamp(Object object) {
+ final long millis;
+ if (object instanceof Long) {
+ millis = (Long) object;
+ } else if (object instanceof Instant) {
+ millis = ((Instant) object).toEpochMilli();
+ } else {
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ millis = jodaConverter.convertTimestamp(object);
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected object type for TIMESTAMP logical type. Received: " + object);
+ }
+ }
+ return TimestampData.fromEpochMillis(millis);
+ }
+
+ private static int convertToDate(Object object) {
+ if (object instanceof Integer) {
+ return (Integer) object;
+ } else if (object instanceof LocalDate) {
+ return (int) ((LocalDate) object).toEpochDay();
+ } else {
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ return (int) jodaConverter.convertDate(object);
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected object type for DATE logical type. Received: " + object);
+ }
+ }
+ }
+
+ private static int convertToTime(Object object) {
+ final int millis;
+ if (object instanceof Integer) {
+ millis = (Integer) object;
+ } else if (object instanceof LocalTime) {
+ millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
+ } else {
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ millis = jodaConverter.convertTime(object);
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected object type for TIME logical type. Received: " + object);
+ }
+ }
+ return millis;
+ }
+
+ private static byte[] convertToBytes(Object object) {
+ if (object instanceof GenericFixed) {
+ return ((GenericFixed) object).bytes();
+ } else if (object instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) object;
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return bytes;
+ } else {
+ return (byte[]) object;
+ }
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroWriterFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroWriterFactory.java
new file mode 100644
index 00000000..508e0a02
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroWriterFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import org.apache.avro.file.DataFileWriter;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates an {@link AvroBulkWriter}. The factory takes a user-supplied builder to
+ * assemble Parquet's writer and then turns it into a Flink {@code BulkWriter}.
+ *
+ * @param <T> The type of record to write.
+ */
+public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {
+ private static final long serialVersionUID = 1L;
+
+ /** The builder to construct the Avro {@link DataFileWriter}. */
+ private final AvroBuilder<T> avroBuilder;
+
+ /** Creates a new AvroWriterFactory using the given builder to assemble the ParquetWriter. */
+ public AvroWriterFactory(AvroBuilder<T> avroBuilder) {
+ this.avroBuilder = avroBuilder;
+ }
+
+ @Override
+ public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
+ return new AvroBulkWriter<>(avroBuilder.createWriter(new CloseShieldOutputStream(out)));
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/CloseShieldOutputStream.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/CloseShieldOutputStream.java
new file mode 100644
index 00000000..d29970f2
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/CloseShieldOutputStream.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/** A proxy output stream that prevents the underlying output stream from being closed. */
+public class CloseShieldOutputStream extends OutputStream {
+ private final OutputStream out;
+
+ public CloseShieldOutputStream(OutputStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] buffer) throws IOException {
+ out.write(buffer);
+ }
+
+ @Override
+ public void write(byte[] buffer, int off, int len) throws IOException {
+ out.write(buffer, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Do not actually close the internal stream.
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/FSDataInputStreamWrapper.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/FSDataInputStreamWrapper.java
new file mode 100644
index 00000000..0b1893e4
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/FSDataInputStreamWrapper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import org.apache.avro.file.SeekableInput;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well).
+ *
+ * <p>The wrapper keeps track of the position in the data stream.
+ */
+public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
+
+ private final FSDataInputStream stream;
+ private final long len;
+
+ public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
+ this.stream = stream;
+ this.len = len;
+ }
+
+ @Override
+ public long length() throws IOException {
+ return this.len;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return stream.read(b, off, len);
+ }
+
+ @Override
+ public void seek(long p) throws IOException {
+ stream.seek(p);
+ }
+
+ @Override
+ public long tell() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/JodaConverter.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/JodaConverter.java
new file mode 100644
index 00000000..4a333d52
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/JodaConverter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * Encapsulates joda optional dependency. Instantiates this class only if joda is available on the
+ * classpath.
+ */
+class JodaConverter {
+
+ private static JodaConverter instance;
+ private static boolean instantiated = false;
+
+ public static JodaConverter getConverter() {
+ if (instantiated) {
+ return instance;
+ }
+
+ try {
+ Class.forName(
+ "org.joda.time.DateTime",
+ false,
+ Thread.currentThread().getContextClassLoader());
+ instance = new JodaConverter();
+ } catch (ClassNotFoundException e) {
+ instance = null;
+ } finally {
+ instantiated = true;
+ }
+ return instance;
+ }
+
+ public long convertDate(Object object) {
+ final LocalDate value = (LocalDate) object;
+ return value.toDate().getTime();
+ }
+
+ public int convertTime(Object object) {
+ final LocalTime value = (LocalTime) object;
+ return value.get(DateTimeFieldType.millisOfDay());
+ }
+
+ public long convertTimestamp(Object object) {
+ final DateTime value = (DateTime) object;
+ return value.toDate().getTime();
+ }
+
+ private JodaConverter() {}
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/RowDataToAvroConverters.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/RowDataToAvroConverters.java
new file mode 100644
index 00000000..cebc3091
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/RowDataToAvroConverters.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.store.format.avro.AvroSchemaConverter.extractValueTypeToAvroMap;
+
+/** Tool class used to convert from {@link RowData} to Avro {@link GenericRecord}. */
+@Internal
+public class RowDataToAvroConverters {
+
+ // --------------------------------------------------------------------------------
+ // Runtime Converters
+ // --------------------------------------------------------------------------------
+
+ /**
+ * Runtime converter that converts objects of Flink Table & SQL internal data structures to
+ * corresponding Avro data structures.
+ */
+ @FunctionalInterface
+ public interface RowDataToAvroConverter extends Serializable {
+ Object convert(Schema schema, Object object);
+ }
+
+ // --------------------------------------------------------------------------------
+ // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
+ // necessary because the maven shade plugin cannot relocate classes in
+ // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Avro for
+ // sql-client uber jars.
+ // --------------------------------------------------------------------------------
+
+ /**
+ * Creates a runtime converter according to the given logical type that converts objects of
+ * Flink Table & SQL internal data structures to corresponding Avro data structures.
+ */
+ public static RowDataToAvroConverter createConverter(LogicalType type) {
+ final RowDataToAvroConverter converter;
+ switch (type.getTypeRoot()) {
+ case NULL:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return null;
+ }
+ };
+ break;
+ case TINYINT:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ((Byte) object).intValue();
+ }
+ };
+ break;
+ case SMALLINT:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ((Short) object).intValue();
+ }
+ };
+ break;
+ case BOOLEAN: // boolean
+ case INTEGER: // int
+ case INTERVAL_YEAR_MONTH: // long
+ case BIGINT: // long
+ case INTERVAL_DAY_TIME: // long
+ case FLOAT: // float
+ case DOUBLE: // double
+ case TIME_WITHOUT_TIME_ZONE: // int
+ case DATE: // int
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return object;
+ }
+ };
+ break;
+ case CHAR:
+ case VARCHAR:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return new Utf8(object.toString());
+ }
+ };
+ break;
+ case BINARY:
+ case VARBINARY:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ByteBuffer.wrap((byte[]) object);
+ }
+ };
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ((TimestampData) object).toInstant().toEpochMilli();
+ }
+ };
+ break;
+ case DECIMAL:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
+ }
+ };
+ break;
+ case ARRAY:
+ converter = createArrayConverter((ArrayType) type);
+ break;
+ case ROW:
+ converter = createRowConverter((RowType) type);
+ break;
+ case MAP:
+ case MULTISET:
+ converter = createMapConverter(type);
+ break;
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+
+ // wrap into nullable converter
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ if (object == null) {
+ return null;
+ }
+
+ // get actual schema if it is a nullable schema
+ Schema actualSchema;
+ if (schema.getType() == Schema.Type.UNION) {
+ List<Schema> types = schema.getTypes();
+ int size = types.size();
+ if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+ actualSchema = types.get(0);
+ } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+ actualSchema = types.get(1);
+ } else {
+ throw new IllegalArgumentException(
+ "The Avro schema is not a nullable type: " + schema);
+ }
+ } else {
+ actualSchema = schema;
+ }
+ return converter.convert(actualSchema, object);
+ }
+ };
+ }
+
+ private static RowDataToAvroConverter createRowConverter(RowType rowType) {
+ final RowDataToAvroConverter[] fieldConverters =
+ rowType.getChildren().stream()
+ .map(RowDataToAvroConverters::createConverter)
+ .toArray(RowDataToAvroConverter[]::new);
+ final LogicalType[] fieldTypes =
+ rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .toArray(LogicalType[]::new);
+ final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.length];
+ for (int i = 0; i < fieldTypes.length; i++) {
+ fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i);
+ }
+ final int length = rowType.getFieldCount();
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ final RowData row = (RowData) object;
+ final List<Schema.Field> fields = schema.getFields();
+ final GenericRecord record = new GenericData.Record(schema);
+ for (int i = 0; i < length; ++i) {
+ final Schema.Field schemaField = fields.get(i);
+ try {
+ Object avroObject =
+ fieldConverters[i].convert(
+ schemaField.schema(), fieldGetters[i].getFieldOrNull(row));
+ record.put(i, avroObject);
+ } catch (Throwable t) {
+ throw new RuntimeException(
+ String.format(
+ "Fail to serialize at field: %s.", schemaField.name()),
+ t);
+ }
+ }
+ return record;
+ }
+ };
+ }
+
+ private static RowDataToAvroConverter createArrayConverter(ArrayType arrayType) {
+ LogicalType elementType = arrayType.getElementType();
+ final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
+ final RowDataToAvroConverter elementConverter = createConverter(arrayType.getElementType());
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ final Schema elementSchema = schema.getElementType();
+ ArrayData arrayData = (ArrayData) object;
+ List<Object> list = new ArrayList<>();
+ for (int i = 0; i < arrayData.size(); ++i) {
+ list.add(
+ elementConverter.convert(
+ elementSchema, elementGetter.getElementOrNull(arrayData, i)));
+ }
+ return list;
+ }
+ };
+ }
+
+ private static RowDataToAvroConverter createMapConverter(LogicalType type) {
+ LogicalType valueType = extractValueTypeToAvroMap(type);
+ final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
+ final RowDataToAvroConverter valueConverter = createConverter(valueType);
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ final Schema valueSchema = schema.getValueType();
+ final MapData mapData = (MapData) object;
+ final ArrayData keyArray = mapData.keyArray();
+ final ArrayData valueArray = mapData.valueArray();
+ final Map<Object, Object> map = new HashMap<>(mapData.size());
+ for (int i = 0; i < mapData.size(); ++i) {
+ final String key = keyArray.getString(i).toString();
+ final Object value =
+ valueConverter.convert(
+ valueSchema, valueGetter.getElementOrNull(valueArray, i));
+ map.put(key, value);
+ }
+ return map;
+ }
+ };
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 2c68889f..bf593a56 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -24,17 +24,17 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.orc.OrcFilters;
-import org.apache.flink.orc.OrcSplitReaderUtil;
-import org.apache.flink.orc.vector.RowDataVectorizer;
-import org.apache.flink.orc.vector.Vectorizer;
-import org.apache.flink.orc.writer.OrcBulkWriterFactory;
-import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.store.format.orc.filter.OrcFileStatsExtractor;
+import org.apache.flink.table.store.format.orc.filter.OrcFilters;
+import org.apache.flink.table.store.format.orc.filter.OrcPredicateFunctionVisitor;
+import org.apache.flink.table.store.format.orc.reader.OrcSplitReaderUtil;
+import org.apache.flink.table.store.format.orc.writer.RowDataVectorizer;
+import org.apache.flink.table.store.format.orc.writer.Vectorizer;
import org.apache.flink.table.store.utils.Projection;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.IntType;
@@ -53,17 +53,17 @@ import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
-import static org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENTIFIER;
-
/** Orc {@link FileFormat}. The main code is copied from Flink {@code OrcFileFormatFactory}. */
public class OrcFileFormat extends FileFormat {
+ public static final String IDENTIFIER = "orc";
+
private final Properties orcProperties;
private final org.apache.hadoop.conf.Configuration readerConf;
private final org.apache.hadoop.conf.Configuration writerConf;
public OrcFileFormat(Configuration formatOptions) {
- super(org.apache.flink.orc.OrcFileFormatFactory.IDENTIFIER);
+ super(IDENTIFIER);
this.orcProperties = getOrcProperties(formatOptions);
this.readerConf = new org.apache.hadoop.conf.Configuration();
this.orcProperties.forEach((k, v) -> readerConf.set(k.toString(), v.toString()));
@@ -93,19 +93,20 @@ public class OrcFileFormat extends FileFormat {
}
}
- return OrcInputFormatFactory.create(
+ return new OrcReaderFactory(
readerConf,
(RowType) refineLogicalType(type),
Projection.of(projection).toTopLevelIndexes(),
- orcPredicates);
+ orcPredicates,
+ 2048);
}
/**
- * The {@link OrcBulkWriterFactory} will create {@link ThreadLocalClassLoaderConfiguration} from
- * the input writer config to avoid classloader leaks.
+ * The {@link OrcWriterFactory} will create {@link ThreadLocalClassLoaderConfiguration} from the
+ * input writer config to avoid classloader leaks.
*
- * <p>TODO: The {@link ThreadLocalClassLoaderConfiguration} in {@link OrcBulkWriterFactory}
- * should be removed after https://issues.apache.org/jira/browse/ORC-653 is fixed.
+ * <p>TODO: The {@link ThreadLocalClassLoaderConfiguration} in {@link OrcWriterFactory} should
+ * be removed after https://issues.apache.org/jira/browse/ORC-653 is fixed.
*
* @param type The data type for the {@link BulkWriter}
* @return The factory of the {@link BulkWriter}
@@ -119,7 +120,7 @@ public class OrcFileFormat extends FileFormat {
Vectorizer<RowData> vectorizer =
new RowDataVectorizer(typeDescription.toString(), orcTypes);
- return new OrcBulkWriterFactory<>(vectorizer, orcProperties, writerConf);
+ return new OrcWriterFactory<>(vectorizer, orcProperties, writerConf);
}
private static Properties getOrcProperties(ReadableConfig options) {
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java
deleted file mode 100644
index e5e5e3ec..00000000
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.format.orc;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.orc.OrcFilters;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.store.utils.ReflectionUtils;
-import org.apache.flink.table.types.logical.RowType;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Function;
-
-/** Factory to create orc input format for different Flink versions. */
-public class OrcInputFormatFactory {
-
- public static BulkFormat<RowData, FileSourceSplit> create(
- Configuration conf,
- RowType type,
- int[] projection,
- List<OrcFilters.Predicate> orcPredicates) {
- try {
- return createFrom115(conf, type, projection, orcPredicates);
- } catch (ClassNotFoundException e) {
- try {
- return createFrom114(conf, type, projection, orcPredicates);
- } catch (ClassNotFoundException ex) {
- throw new RuntimeException(ex);
- }
- }
- }
-
- private static BulkFormat<RowData, FileSourceSplit> createFrom115(
- Configuration conf,
- RowType type,
- int[] projection,
- List<OrcFilters.Predicate> orcPredicates)
- throws ClassNotFoundException {
- Class<?> formatClass = Class.forName("org.apache.flink.orc.OrcColumnarRowInputFormat");
- try {
- return ReflectionUtils.invokeStaticMethod(
- formatClass,
- "createPartitionedFormat",
- new OrcShimImpl(),
- conf,
- type,
- Collections.emptyList(),
- null,
- projection,
- orcPredicates,
- 2048,
- (Function<RowType, TypeInformation<RowData>>) InternalTypeInfo::of);
- } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static BulkFormat<RowData, FileSourceSplit> createFrom114(
- Configuration conf,
- RowType type,
- int[] projection,
- List<OrcFilters.Predicate> orcPredicates)
- throws ClassNotFoundException {
- Class<?> formatClass = Class.forName("org.apache.flink.orc.OrcColumnarRowFileInputFormat");
- try {
- return ReflectionUtils.invokeStaticMethod(
- formatClass,
- "createPartitionedFormat",
- new OrcShimImpl(),
- conf,
- type,
- Collections.emptyList(),
- null,
- projection,
- orcPredicates,
- 2048);
- } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcReaderFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcReaderFactory.java
new file mode 100644
index 00000000..5ce6a835
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcReaderFactory.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.data.columnar.ColumnVector;
+import org.apache.flink.table.store.data.columnar.ColumnarRowData;
+import org.apache.flink.table.store.data.columnar.ColumnarRowIterator;
+import org.apache.flink.table.store.data.columnar.VectorizedColumnBatch;
+import org.apache.flink.table.store.format.fs.HadoopReadOnlyFileSystem;
+import org.apache.flink.table.store.format.orc.filter.OrcFilters;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.flink.table.store.format.orc.reader.AbstractOrcColumnVector.createFlinkVector;
+import static org.apache.flink.table.store.format.orc.reader.OrcSplitReaderUtil.logicalTypeToOrcType;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** An ORC reader that produces a stream of {@link ColumnarRowData} records. */
+public class OrcReaderFactory implements BulkFormat<RowData, FileSourceSplit> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final SerializableHadoopConfigWrapper hadoopConfigWrapper;
+
+ protected final TypeDescription schema;
+
+ private final RowType tableType;
+
+ protected final int[] selectedFields;
+
+ protected final List<OrcFilters.Predicate> conjunctPredicates;
+
+ protected final int batchSize;
+
+ /**
+ * @param hadoopConfig the hadoop config for orc reader.
+ * @param selectedFields the read selected field of orc format.
+ * @param conjunctPredicates the filter predicates that can be evaluated.
+ * @param batchSize the batch size of orc reader.
+ */
+ public OrcReaderFactory(
+ final org.apache.hadoop.conf.Configuration hadoopConfig,
+ final RowType tableType,
+ final int[] selectedFields,
+ final List<OrcFilters.Predicate> conjunctPredicates,
+ final int batchSize) {
+ this.hadoopConfigWrapper = new SerializableHadoopConfigWrapper(checkNotNull(hadoopConfig));
+ this.schema = logicalTypeToOrcType(tableType);
+ this.tableType = tableType;
+ this.selectedFields = checkNotNull(selectedFields);
+ this.conjunctPredicates = checkNotNull(conjunctPredicates);
+ this.batchSize = batchSize;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public OrcVectorizedReader createReader(
+ final org.apache.flink.configuration.Configuration config, final FileSourceSplit split)
+ throws IOException {
+
+ final int numBatchesToCirculate =
+ config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY);
+ final Pool<OrcReaderBatch> poolOfBatches = createPoolOfBatches(numBatchesToCirculate);
+
+ final RecordReader orcReader =
+ createRecordReader(
+ hadoopConfigWrapper.getHadoopConfig(),
+ schema,
+ selectedFields,
+ conjunctPredicates,
+ split.path(),
+ split.offset(),
+ split.length());
+
+ return new OrcVectorizedReader(orcReader, poolOfBatches);
+ }
+
+ @Override
+ public OrcVectorizedReader restoreReader(
+ final org.apache.flink.configuration.Configuration config,
+ final FileSourceSplit split) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return true;
+ }
+
+ /**
+ * Creates the {@link OrcReaderBatch} structure, which is responsible for holding the data
+ * structures that hold the batch data (column vectors, row arrays, ...) and the batch
+ * conversion from the ORC representation to the result format.
+ */
+ public OrcReaderBatch createReaderBatch(
+ VectorizedRowBatch orcBatch, Pool.Recycler<OrcReaderBatch> recycler) {
+ List<String> tableFieldNames = tableType.getFieldNames();
+ List<LogicalType> tableFieldTypes = tableType.getChildren();
+
+ // create and initialize the row batch
+ ColumnVector[] vectors = new ColumnVector[selectedFields.length];
+ for (int i = 0; i < vectors.length; i++) {
+ String name = tableFieldNames.get(selectedFields[i]);
+ LogicalType type = tableFieldTypes.get(selectedFields[i]);
+ vectors[i] = createFlinkVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
+ }
+ VectorizedColumnBatch flinkColumnBatch = new VectorizedColumnBatch(vectors);
+ return new OrcReaderBatch(orcBatch, flinkColumnBatch, recycler);
+ }
+
+ /** Gets the type produced by this format. */
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------------------------------------------------------------------------
+
+ private Pool<OrcReaderBatch> createPoolOfBatches(int numBatches) {
+ final Pool<OrcReaderBatch> pool = new Pool<>(numBatches);
+
+ for (int i = 0; i < numBatches; i++) {
+ final VectorizedRowBatch orcBatch = createBatchWrapper(schema, batchSize);
+ final OrcReaderBatch batch = createReaderBatch(orcBatch, pool.recycler());
+ pool.add(batch);
+ }
+
+ return pool;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The {@code OrcReaderBatch} class holds the data structures containing the batch data (column
+ * vectors, row arrays, ...) and performs the batch conversion from the ORC representation to
+ * the result format.
+ *
+ * <p>This base class only holds the ORC Column Vectors, subclasses hold additionally the result
+ * structures and implement the conversion in {@link
+ * OrcReaderBatch#convertAndGetIterator(VectorizedRowBatch, long)}.
+ */
+ protected static class OrcReaderBatch {
+
+ private final VectorizedRowBatch orcVectorizedRowBatch;
+ private final Pool.Recycler<OrcReaderBatch> recycler;
+
+ private final VectorizedColumnBatch flinkColumnBatch;
+ private final ColumnarRowIterator result;
+
+ protected OrcReaderBatch(
+ final VectorizedRowBatch orcVectorizedRowBatch,
+ final VectorizedColumnBatch flinkColumnBatch,
+ final Pool.Recycler<OrcReaderBatch> recycler) {
+ this.orcVectorizedRowBatch = checkNotNull(orcVectorizedRowBatch);
+ this.recycler = checkNotNull(recycler);
+ this.flinkColumnBatch = flinkColumnBatch;
+ this.result =
+ new ColumnarRowIterator(new ColumnarRowData(flinkColumnBatch), this::recycle);
+ }
+
+ /**
+ * Puts this batch back into the pool. This should be called after all records from the
+ * batch have been returned, typically in the {@link RecordIterator#releaseBatch()} method.
+ */
+ public void recycle() {
+ recycler.recycle(this);
+ }
+
+ /** Gets the ORC VectorizedRowBatch structure from this batch. */
+ public VectorizedRowBatch orcVectorizedRowBatch() {
+ return orcVectorizedRowBatch;
+ }
+
+ /**
+ * Converts the ORC VectorizedRowBatch into the result structure and returns an iterator
+ * over the entries.
+ *
+ * <p>This method may, for example, return a single element iterator that returns the entire
+ * batch as one, or (as another example) return an iterator over the rows projected from
+ * this column batch.
+ *
+ * <p>The position information in the result needs to be constructed as follows: The value
+ * of {@code startingOffset} is the offset value ({@link RecordAndPosition#getOffset()}) for
+ * all rows in the batch. Each row then increments the records-to-skip value ({@link
+ * RecordAndPosition#getRecordSkipCount()}).
+ */
+ private RecordIterator<RowData> convertAndGetIterator(
+ final VectorizedRowBatch orcBatch, final long startingOffset) {
+ // no copying from the ORC column vectors to the Flink columns vectors necessary,
+ // because they point to the same data arrays internally design
+ int batchSize = orcBatch.size;
+ flinkColumnBatch.setNumRows(batchSize);
+ result.set(batchSize, startingOffset, 0);
+ return result;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A vectorized ORC reader. This reader reads an ORC Batch at a time and converts it to one or
+ * more records to be returned. An ORC Row-wise reader would convert the batch into a set of
+ * rows, while a reader for a vectorized query processor might return the whole batch as one
+ * record.
+ *
+ * <p>The conversion of the {@code VectorizedRowBatch} happens in the specific {@link
+ * OrcReaderBatch} implementation.
+ *
+ * <p>The reader tracks its current position using ORC's <i>row numbers</i>. Each record in a
+ * batch is addressed by the starting row number of the batch, plus the number of records to be
+ * skipped before.
+ */
+ private static final class OrcVectorizedReader implements BulkFormat.Reader<RowData> {
+
+ private final RecordReader orcReader;
+ private final Pool<OrcReaderBatch> pool;
+
+ private OrcVectorizedReader(final RecordReader orcReader, final Pool<OrcReaderBatch> pool) {
+ this.orcReader = checkNotNull(orcReader, "orcReader");
+ this.pool = checkNotNull(pool, "pool");
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<RowData> readBatch() throws IOException {
+ final OrcReaderBatch batch = getCachedEntry();
+ final VectorizedRowBatch orcVectorBatch = batch.orcVectorizedRowBatch();
+
+ final long orcRowNumber = orcReader.getRowNumber();
+ if (!nextBatch(orcReader, orcVectorBatch)) {
+ batch.recycle();
+ return null;
+ }
+
+ return batch.convertAndGetIterator(orcVectorBatch, orcRowNumber);
+ }
+
+ @Override
+ public void close() throws IOException {
+ orcReader.close();
+ }
+
+ private OrcReaderBatch getCachedEntry() throws IOException {
+ try {
+ return pool.pollEntry();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted");
+ }
+ }
+ }
+
+ private static RecordReader createRecordReader(
+ org.apache.hadoop.conf.Configuration conf,
+ TypeDescription schema,
+ int[] selectedFields,
+ List<OrcFilters.Predicate> conjunctPredicates,
+ org.apache.flink.core.fs.Path path,
+ long splitStart,
+ long splitLength)
+ throws IOException {
+ org.apache.orc.Reader orcReader = createReader(conf, path);
+
+ // get offset and length for the stripes that start in the split
+ Tuple2<Long, Long> offsetAndLength =
+ getOffsetAndLengthForSplit(splitStart, splitLength, orcReader.getStripes());
+
+ // create ORC row reader configuration
+ org.apache.orc.Reader.Options options =
+ new org.apache.orc.Reader.Options()
+ .schema(schema)
+ .range(offsetAndLength.f0, offsetAndLength.f1)
+ .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
+ .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
+ .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
+
+ // configure filters
+ if (!conjunctPredicates.isEmpty()) {
+ SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
+ b = b.startAnd();
+ for (OrcFilters.Predicate predicate : conjunctPredicates) {
+ predicate.add(b);
+ }
+ b = b.end();
+ options.searchArgument(b.build(), new String[] {});
+ }
+
+ // configure selected fields
+ options.include(computeProjectionMask(schema, selectedFields));
+
+ // create ORC row reader
+ RecordReader orcRowsReader = orcReader.rows(options);
+
+ // assign ids
+ schema.getId();
+
+ return orcRowsReader;
+ }
+
+ private static VectorizedRowBatch createBatchWrapper(TypeDescription schema, int batchSize) {
+ return schema.createRowBatch(batchSize);
+ }
+
+ private static boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch)
+ throws IOException {
+ return reader.nextBatch(rowBatch);
+ }
+
+ private static Tuple2<Long, Long> getOffsetAndLengthForSplit(
+ long splitStart, long splitLength, List<StripeInformation> stripes) {
+ long splitEnd = splitStart + splitLength;
+ long readStart = Long.MAX_VALUE;
+ long readEnd = Long.MIN_VALUE;
+
+ for (StripeInformation s : stripes) {
+ if (splitStart <= s.getOffset() && s.getOffset() < splitEnd) {
+ // stripe starts in split, so it is included
+ readStart = Math.min(readStart, s.getOffset());
+ readEnd = Math.max(readEnd, s.getOffset() + s.getLength());
+ }
+ }
+
+ if (readStart < Long.MAX_VALUE) {
+ // at least one split is included
+ return Tuple2.of(readStart, readEnd - readStart);
+ } else {
+ return Tuple2.of(0L, 0L);
+ }
+ }
+
+ /**
+ * Computes the ORC projection mask of the fields to include from the selected
+ * fields.rowOrcInputFormat.nextRecord(null).
+ *
+ * @return The ORC projection mask.
+ */
+ private static boolean[] computeProjectionMask(TypeDescription schema, int[] selectedFields) {
+ // mask with all fields of the schema
+ boolean[] projectionMask = new boolean[schema.getMaximumId() + 1];
+ // for each selected field
+ for (int inIdx : selectedFields) {
+ // set all nested fields of a selected field to true
+ TypeDescription fieldSchema = schema.getChildren().get(inIdx);
+ for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) {
+ projectionMask[i] = true;
+ }
+ }
+ return projectionMask;
+ }
+
+ public static org.apache.orc.Reader createReader(
+ org.apache.hadoop.conf.Configuration conf, org.apache.flink.core.fs.Path path)
+ throws IOException {
+ // open ORC file and create reader
+ org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(path.toUri());
+
+ OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
+
+ // configure filesystem from Flink filesystem
+ readerOptions.filesystem(new HadoopReadOnlyFileSystem(path.getFileSystem()));
+
+ return OrcFile.createReader(hPath, readerOptions);
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcShimImpl.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcShimImpl.java
deleted file mode 100644
index 69fd951e..00000000
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcShimImpl.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.format.orc;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.orc.OrcFilters;
-import org.apache.flink.orc.shim.OrcShim;
-import org.apache.flink.orc.vector.HiveOrcBatchWrapper;
-import org.apache.flink.table.store.format.fs.HadoopReadOnlyFileSystem;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.TypeDescription;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A {@link OrcShim} for table store.
- *
- * <p>This is copied from flink-orc except filesystem setting.
- */
-public class OrcShimImpl implements OrcShim<VectorizedRowBatch> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public RecordReader createRecordReader(
- Configuration conf,
- TypeDescription schema,
- int[] selectedFields,
- List<OrcFilters.Predicate> conjunctPredicates,
- org.apache.flink.core.fs.Path path,
- long splitStart,
- long splitLength)
- throws IOException {
- Reader orcReader = createReader(conf, path);
-
- // get offset and length for the stripes that start in the split
- Tuple2<Long, Long> offsetAndLength =
- getOffsetAndLengthForSplit(splitStart, splitLength, orcReader.getStripes());
-
- // create ORC row reader configuration
- Reader.Options options =
- new Reader.Options()
- .schema(schema)
- .range(offsetAndLength.f0, offsetAndLength.f1)
- .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
- .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
- .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
-
- // configure filters
- if (!conjunctPredicates.isEmpty()) {
- SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
- b = b.startAnd();
- for (OrcFilters.Predicate predicate : conjunctPredicates) {
- predicate.add(b);
- }
- b = b.end();
- options.searchArgument(b.build(), new String[] {});
- }
-
- // configure selected fields
- options.include(computeProjectionMask(schema, selectedFields));
-
- // create ORC row reader
- RecordReader orcRowsReader = orcReader.rows(options);
-
- // assign ids
- schema.getId();
-
- return orcRowsReader;
- }
-
- @Override
- public HiveOrcBatchWrapper createBatchWrapper(TypeDescription schema, int batchSize) {
- return new HiveOrcBatchWrapper(schema.createRowBatch(batchSize));
- }
-
- @Override
- public boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch) throws IOException {
- return reader.nextBatch(rowBatch);
- }
-
- private static Tuple2<Long, Long> getOffsetAndLengthForSplit(
- long splitStart, long splitLength, List<StripeInformation> stripes) {
- long splitEnd = splitStart + splitLength;
- long readStart = Long.MAX_VALUE;
- long readEnd = Long.MIN_VALUE;
-
- for (StripeInformation s : stripes) {
- if (splitStart <= s.getOffset() && s.getOffset() < splitEnd) {
- // stripe starts in split, so it is included
- readStart = Math.min(readStart, s.getOffset());
- readEnd = Math.max(readEnd, s.getOffset() + s.getLength());
- }
- }
-
- if (readStart < Long.MAX_VALUE) {
- // at least one split is included
- return Tuple2.of(readStart, readEnd - readStart);
- } else {
- return Tuple2.of(0L, 0L);
- }
- }
-
- /**
- * Computes the ORC projection mask of the fields to include from the selected
- * fields.rowOrcInputFormat.nextRecord(null).
- *
- * @return The ORC projection mask.
- */
- private static boolean[] computeProjectionMask(TypeDescription schema, int[] selectedFields) {
- // mask with all fields of the schema
- boolean[] projectionMask = new boolean[schema.getMaximumId() + 1];
- // for each selected field
- for (int inIdx : selectedFields) {
- // set all nested fields of a selected field to true
- TypeDescription fieldSchema = schema.getChildren().get(inIdx);
- for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) {
- projectionMask[i] = true;
- }
- }
- return projectionMask;
- }
-
- public static Reader createReader(Configuration conf, org.apache.flink.core.fs.Path path)
- throws IOException {
- // open ORC file and create reader
- Path hPath = new Path(path.toUri());
-
- OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
-
- // configure filesystem from Flink filesystem
- readerOptions.filesystem(new HadoopReadOnlyFileSystem(path.getFileSystem()));
-
- return OrcFile.createReader(hPath, readerOptions);
- }
-}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcWriterFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcWriterFactory.java
new file mode 100644
index 00000000..34960bd9
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcWriterFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.table.store.format.orc.writer.OrcBulkWriter;
+import org.apache.flink.table.store.format.orc.writer.PhysicalWriterImpl;
+import org.apache.flink.table.store.format.orc.writer.Vectorizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.OrcFile;
+import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory that creates an ORC {@link BulkWriter}. The factory takes a user supplied {@link
+ * Vectorizer} implementation to convert the element into an {@link
+ * org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch}.
+ *
+ * @param <T> The type of element to write.
+ */
+@PublicEvolving
+public class OrcWriterFactory<T> implements BulkWriter.Factory<T> {
+
+ private final Vectorizer<T> vectorizer;
+ private final Properties writerProperties;
+ private final Map<String, String> confMap;
+
+ private OrcFile.WriterOptions writerOptions;
+
+ /**
+ * Creates a new OrcBulkWriterFactory using the provided Vectorizer implementation.
+ *
+ * @param vectorizer The vectorizer implementation to convert input record to a
+ * VectorizerRowBatch.
+ */
+ public OrcWriterFactory(Vectorizer<T> vectorizer) {
+ this(vectorizer, new Configuration());
+ }
+
+ /**
+ * Creates a new OrcBulkWriterFactory using the provided Vectorizer, Hadoop Configuration.
+ *
+ * @param vectorizer The vectorizer implementation to convert input record to a
+ * VectorizerRowBatch.
+ */
+ public OrcWriterFactory(Vectorizer<T> vectorizer, Configuration configuration) {
+ this(vectorizer, null, configuration);
+ }
+
+ /**
+ * Creates a new OrcBulkWriterFactory using the provided Vectorizer, Hadoop Configuration, ORC
+ * writer properties.
+ *
+ * @param vectorizer The vectorizer implementation to convert input record to a
+ * VectorizerRowBatch.
+ * @param writerProperties Properties that can be used in ORC WriterOptions.
+ */
+ public OrcWriterFactory(
+ Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration) {
+ this.vectorizer = checkNotNull(vectorizer);
+ this.writerProperties = writerProperties;
+ this.confMap = new HashMap<>();
+
+ // Todo: Replace the Map based approach with a better approach
+ for (Map.Entry<String, String> entry : configuration) {
+ confMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
+ OrcFile.WriterOptions opts = getWriterOptions();
+ opts.physicalWriter(new PhysicalWriterImpl(out, opts));
+
+ // The path of the Writer is not used to indicate the destination file
+ // in this case since we have used a dedicated physical writer to write
+ // to the give output stream directly. However, the path would be used as
+ // the key of writer in the ORC memory manager, thus we need to make it unique.
+ Path unusedPath = new Path(UUID.randomUUID().toString());
+ return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, unusedPath, opts));
+ }
+
+ @VisibleForTesting
+ protected OrcFile.WriterOptions getWriterOptions() {
+ if (null == writerOptions) {
+ Configuration conf = new ThreadLocalClassLoaderConfiguration();
+ for (Map.Entry<String, String> entry : confMap.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+
+ writerOptions = OrcFile.writerOptions(writerProperties, conf);
+ writerOptions.setSchema(this.vectorizer.getSchema());
+ }
+
+ return writerOptions;
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/SerializableHadoopConfigWrapper.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/SerializableHadoopConfigWrapper.java
new file mode 100644
index 00000000..4a770ba0
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/SerializableHadoopConfigWrapper.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Utility class to make a {@link Configuration Hadoop Configuration} serializable. */
+public final class SerializableHadoopConfigWrapper implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Configuration hadoopConfig;
+
+ public SerializableHadoopConfigWrapper(Configuration hadoopConfig) {
+ this.hadoopConfig = checkNotNull(hadoopConfig);
+ }
+
+ public Configuration getHadoopConfig() {
+ return hadoopConfig;
+ }
+
+ // ------------------------------------------------------------------------
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+
+ // we write the Hadoop config through a separate serializer to avoid cryptic exceptions when
+ // it
+ // corrupts the serialization stream
+ final DataOutputSerializer ser = new DataOutputSerializer(256);
+ hadoopConfig.write(ser);
+ out.writeInt(ser.length());
+ out.write(ser.getSharedBuffer(), 0, ser.length());
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ final byte[] data = new byte[in.readInt()];
+ in.readFully(data);
+ final DataInputDeserializer deser = new DataInputDeserializer(data);
+ this.hadoopConfig = new Configuration();
+
+ try {
+ this.hadoopConfig.readFields(deser);
+ } catch (IOException e) {
+ throw new IOException(
+ "Could not deserialize Hadoop Configuration, the serialized and de-serialized don't match.",
+ e);
+ }
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/ThreadLocalClassLoaderConfiguration.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/ThreadLocalClassLoaderConfiguration.java
new file mode 100644
index 00000000..033f4edd
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/ThreadLocalClassLoaderConfiguration.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.net.URL;
+
+/**
+ * Workaround for https://issues.apache.org/jira/browse/ORC-653.
+ *
+ * <p>Since the conf is effectively cached across Flink jobs, at least force the thread local
+ * classloader to avoid classloader leaks.
+ */
+@Internal
+public final class ThreadLocalClassLoaderConfiguration extends Configuration {
+ public ThreadLocalClassLoaderConfiguration() {}
+
+ public ThreadLocalClassLoaderConfiguration(Configuration other) {
+ super(other);
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return Thread.currentThread().getContextClassLoader();
+ }
+
+ @Override
+ public Class<?> getClassByNameOrNull(String name) {
+ try {
+ return Class.forName(name, true, getClassLoader());
+ } catch (ClassNotFoundException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public URL getResource(String name) {
+ return getClassLoader().getResource(name);
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFileStatsExtractor.java
similarity index 97%
rename from flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
rename to flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFileStatsExtractor.java
index 2cd7ac20..36dc0fa5 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFileStatsExtractor.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.format.orc;
+package org.apache.flink.table.store.format.orc.filter;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.DecimalData;
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.store.format.orc.OrcReaderFactory;
import org.apache.flink.table.store.utils.DateTimeUtils;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.RowType;
@@ -57,7 +58,7 @@ public class OrcFileStatsExtractor implements FileStatsExtractor {
@Override
public FieldStats[] extract(Path path) throws IOException {
- try (Reader reader = OrcShimImpl.createReader(new Configuration(), path)) {
+ try (Reader reader = OrcReaderFactory.createReader(new Configuration(), path)) {
long rowCount = reader.getNumberOfRows();
ColumnStatistics[] columnStatistics = reader.getStatistics();
TypeDescription schema = reader.getSchema();
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java
new file mode 100644
index 00000000..8863c316
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.filter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.store.utils.MapBuilder;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.function.TriFunction;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+/** Utility class that provides helper methods to work with Orc Filter PushDown. */
+public class OrcFilters {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OrcFilters.class);
+
+ private static final Map<FunctionDefinition, Function<CallExpression, Predicate>> FILTERS =
+ new MapBuilder<FunctionDefinition, Function<CallExpression, Predicate>>()
+ .put(BuiltInFunctionDefinitions.IS_NULL, OrcFilters::convertIsNull)
+ .put(BuiltInFunctionDefinitions.IS_NOT_NULL, OrcFilters::convertIsNotNull)
+ .put(BuiltInFunctionDefinitions.NOT, OrcFilters::convertNot)
+ .put(BuiltInFunctionDefinitions.OR, OrcFilters::convertOr)
+ .put(
+ BuiltInFunctionDefinitions.EQUALS,
+ call ->
+ convertBinary(
+ call,
+ OrcFilters::convertEquals,
+ OrcFilters::convertEquals))
+ .put(
+ BuiltInFunctionDefinitions.NOT_EQUALS,
+ call ->
+ convertBinary(
+ call,
+ OrcFilters::convertNotEquals,
+ OrcFilters::convertNotEquals))
+ .put(
+ BuiltInFunctionDefinitions.GREATER_THAN,
+ call ->
+ convertBinary(
+ call,
+ OrcFilters::convertGreaterThan,
+ OrcFilters::convertLessThanEquals))
+ .put(
+ BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
+ call ->
+ convertBinary(
+ call,
+ OrcFilters::convertGreaterThanEquals,
+ OrcFilters::convertLessThan))
+ .put(
+ BuiltInFunctionDefinitions.LESS_THAN,
+ call ->
+ convertBinary(
+ call,
+ OrcFilters::convertLessThan,
+ OrcFilters::convertGreaterThanEquals))
+ .put(
+ BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+ call ->
+ convertBinary(
+ call,
+ OrcFilters::convertLessThanEquals,
+ OrcFilters::convertGreaterThan))
+ .unmodifiable();
+
+ private static boolean isRef(Expression expression) {
+ return expression instanceof FieldReferenceExpression;
+ }
+
+ private static boolean isLit(Expression expression) {
+ return expression instanceof ValueLiteralExpression;
+ }
+
+ private static boolean isUnaryValid(CallExpression callExpression) {
+ return callExpression.getChildren().size() == 1
+ && isRef(callExpression.getChildren().get(0));
+ }
+
+ private static boolean isBinaryValid(CallExpression callExpression) {
+ return callExpression.getChildren().size() == 2
+ && (isRef(callExpression.getChildren().get(0))
+ && isLit(callExpression.getChildren().get(1))
+ || isLit(callExpression.getChildren().get(0))
+ && isRef(callExpression.getChildren().get(1)));
+ }
+
+ private static Predicate convertIsNull(CallExpression callExp) {
+ if (!isUnaryValid(callExp)) {
+ // not a valid predicate
+ LOG.debug(
+ "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+ callExp);
+ return null;
+ }
+
+ PredicateLeaf.Type colType =
+ toOrcType(
+ ((FieldReferenceExpression) callExp.getChildren().get(0))
+ .getOutputDataType());
+ if (colType == null) {
+ // unsupported type
+ LOG.debug(
+ "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+ callExp);
+ return null;
+ }
+
+ String colName = getColumnName(callExp);
+
+ return new IsNull(colName, colType);
+ }
+
+ private static Predicate convertIsNotNull(CallExpression callExp) {
+ return new Not(convertIsNull(callExp));
+ }
+
+ private static Predicate convertNot(CallExpression callExp) {
+ if (callExp.getChildren().size() != 1) {
+ // not a valid predicate
+ LOG.debug(
+ "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+ callExp);
+ return null;
+ }
+
+ Predicate c = toOrcPredicate(callExp.getChildren().get(0));
+ return c == null ? null : new Not(c);
+ }
+
+ private static Predicate convertOr(CallExpression callExp) {
+ if (callExp.getChildren().size() < 2) {
+ return null;
+ }
+ Expression left = callExp.getChildren().get(0);
+ Expression right = callExp.getChildren().get(1);
+
+ Predicate c1 = toOrcPredicate(left);
+ Predicate c2 = toOrcPredicate(right);
+ if (c1 == null || c2 == null) {
+ return null;
+ } else {
+ return new Or(c1, c2);
+ }
+ }
+
+ public static Predicate convertBinary(
+ CallExpression callExp,
+ TriFunction<String, PredicateLeaf.Type, Serializable, Predicate> func,
+ TriFunction<String, PredicateLeaf.Type, Serializable, Predicate> reverseFunc) {
+ if (!isBinaryValid(callExp)) {
+ // not a valid predicate
+ LOG.debug(
+ "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+ callExp);
+ return null;
+ }
+
+ PredicateLeaf.Type litType = getLiteralType(callExp);
+ if (litType == null) {
+ // unsupported literal type
+ LOG.debug(
+ "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+ callExp);
+ return null;
+ }
+
+ String colName = getColumnName(callExp);
+
+ // fetch literal and ensure it is serializable
+ Object literalObj = getLiteral(callExp).get();
+ Object orcObj = toOrcObject(litType, literalObj);
+ Serializable literal;
+ // validate that literal is serializable
+ if (orcObj instanceof Serializable) {
+ literal = (Serializable) orcObj;
+ } else {
+ LOG.warn(
+ "Encountered a non-serializable literal of type {}. "
+ + "Cannot push predicate [{}] into OrcFileSystemFormatFactory. "
+ + "This is a bug and should be reported.",
+ literalObj.getClass().getCanonicalName(),
+ callExp);
+ return null;
+ }
+
+ return literalOnRight(callExp)
+ ? func.apply(colName, litType, literal)
+ : reverseFunc.apply(colName, litType, literal);
+ }
+
+ private static Predicate convertEquals(
+ String colName, PredicateLeaf.Type litType, Serializable literal) {
+ return new Equals(colName, litType, literal);
+ }
+
+ private static Predicate convertNotEquals(
+ String colName, PredicateLeaf.Type litType, Serializable literal) {
+ return new Not(convertEquals(colName, litType, literal));
+ }
+
+ private static Predicate convertGreaterThan(
+ String colName, PredicateLeaf.Type litType, Serializable literal) {
+ return new Not(new LessThanEquals(colName, litType, literal));
+ }
+
+ private static Predicate convertGreaterThanEquals(
+ String colName, PredicateLeaf.Type litType, Serializable literal) {
+ return new Not(new LessThan(colName, litType, literal));
+ }
+
+ private static Predicate convertLessThan(
+ String colName, PredicateLeaf.Type litType, Serializable literal) {
+ return new LessThan(colName, litType, literal);
+ }
+
+ private static Predicate convertLessThanEquals(
+ String colName, PredicateLeaf.Type litType, Serializable literal) {
+ return new LessThanEquals(colName, litType, literal);
+ }
+
+ public static Predicate toOrcPredicate(Expression expression) {
+ if (expression instanceof CallExpression) {
+ CallExpression callExp = (CallExpression) expression;
+ if (FILTERS.get(callExp.getFunctionDefinition()) == null) {
+ // unsupported predicate
+ LOG.debug(
+ "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+ expression);
+ return null;
+ }
+ return FILTERS.get(callExp.getFunctionDefinition()).apply(callExp);
+ } else {
+ // unsupported predicate
+ LOG.debug(
+ "Unsupported predicate [{}] cannot be pushed into OrcFileSystemFormatFactory.",
+ expression);
+ return null;
+ }
+ }
+
+ private static String getColumnName(CallExpression comp) {
+ if (literalOnRight(comp)) {
+ return ((FieldReferenceExpression) comp.getChildren().get(0)).getName();
+ } else {
+ return ((FieldReferenceExpression) comp.getChildren().get(1)).getName();
+ }
+ }
+
+ private static boolean literalOnRight(CallExpression comp) {
+ if (comp.getChildren().size() == 1
+ && comp.getChildren().get(0) instanceof FieldReferenceExpression) {
+ return true;
+ } else if (isLit(comp.getChildren().get(0)) && isRef(comp.getChildren().get(1))) {
+ return false;
+ } else if (isRef(comp.getChildren().get(0)) && isLit(comp.getChildren().get(1))) {
+ return true;
+ } else {
+ throw new RuntimeException("Invalid binary comparison.");
+ }
+ }
+
+ private static PredicateLeaf.Type getLiteralType(CallExpression comp) {
+ if (literalOnRight(comp)) {
+ return toOrcType(
+ ((ValueLiteralExpression) comp.getChildren().get(1)).getOutputDataType());
+ } else {
+ return toOrcType(
+ ((ValueLiteralExpression) comp.getChildren().get(0)).getOutputDataType());
+ }
+ }
+
+ private static Object toOrcObject(PredicateLeaf.Type litType, Object literalObj) {
+ switch (litType) {
+ case DATE:
+ if (literalObj instanceof LocalDate) {
+ LocalDate localDate = (LocalDate) literalObj;
+ return Date.valueOf(localDate);
+ } else {
+ return literalObj;
+ }
+ case TIMESTAMP:
+ if (literalObj instanceof LocalDateTime) {
+ LocalDateTime localDateTime = (LocalDateTime) literalObj;
+ return Timestamp.valueOf(localDateTime);
+ } else {
+ return literalObj;
+ }
+ default:
+ return literalObj;
+ }
+ }
+
+ private static Optional<?> getLiteral(CallExpression comp) {
+ if (literalOnRight(comp)) {
+ ValueLiteralExpression valueLiteralExpression =
+ (ValueLiteralExpression) comp.getChildren().get(1);
+ return valueLiteralExpression.getValueAs(
+ valueLiteralExpression.getOutputDataType().getConversionClass());
+ } else {
+ ValueLiteralExpression valueLiteralExpression =
+ (ValueLiteralExpression) comp.getChildren().get(0);
+ return valueLiteralExpression.getValueAs(
+ valueLiteralExpression.getOutputDataType().getConversionClass());
+ }
+ }
+
+ private static PredicateLeaf.Type toOrcType(DataType type) {
+ LogicalTypeRoot ltype = type.getLogicalType().getTypeRoot();
+ switch (ltype) {
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ return PredicateLeaf.Type.LONG;
+ case FLOAT:
+ case DOUBLE:
+ return PredicateLeaf.Type.FLOAT;
+ case BOOLEAN:
+ return PredicateLeaf.Type.BOOLEAN;
+ case CHAR:
+ case VARCHAR:
+ return PredicateLeaf.Type.STRING;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return PredicateLeaf.Type.TIMESTAMP;
+ case DATE:
+ return PredicateLeaf.Type.DATE;
+ case DECIMAL:
+ return PredicateLeaf.Type.DECIMAL;
+ default:
+ return null;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Classes to define predicates
+ // --------------------------------------------------------------------------------------------
+
+ /** A filter predicate that can be evaluated by the OrcInputFormat. */
+ public abstract static class Predicate implements Serializable {
+ public abstract SearchArgument.Builder add(SearchArgument.Builder builder);
+ }
+
+ abstract static class ColumnPredicate extends Predicate {
+ final String columnName;
+ final PredicateLeaf.Type literalType;
+
+ ColumnPredicate(String columnName, PredicateLeaf.Type literalType) {
+ this.columnName = columnName;
+ this.literalType = literalType;
+ }
+
+ Object castLiteral(Serializable literal) {
+
+ switch (literalType) {
+ case LONG:
+ if (literal instanceof Byte) {
+ return new Long((Byte) literal);
+ } else if (literal instanceof Short) {
+ return new Long((Short) literal);
+ } else if (literal instanceof Integer) {
+ return new Long((Integer) literal);
+ } else if (literal instanceof Long) {
+ return literal;
+ } else {
+ throw new IllegalArgumentException(
+ "A predicate on a LONG column requires an integer "
+ + "literal, i.e., Byte, Short, Integer, or Long.");
+ }
+ case FLOAT:
+ if (literal instanceof Float) {
+ return new Double((Float) literal);
+ } else if (literal instanceof Double) {
+ return literal;
+ } else if (literal instanceof BigDecimal) {
+ return ((BigDecimal) literal).doubleValue();
+ } else {
+ throw new IllegalArgumentException(
+ "A predicate on a FLOAT column requires a floating "
+ + "literal, i.e., Float or Double.");
+ }
+ case STRING:
+ if (literal instanceof String) {
+ return literal;
+ } else {
+ throw new IllegalArgumentException(
+ "A predicate on a STRING column requires a floating "
+ + "literal, i.e., Float or Double.");
+ }
+ case BOOLEAN:
+ if (literal instanceof Boolean) {
+ return literal;
+ } else {
+ throw new IllegalArgumentException(
+ "A predicate on a BOOLEAN column requires a Boolean literal.");
+ }
+ case DATE:
+ if (literal instanceof Date) {
+ return literal;
+ } else {
+ throw new IllegalArgumentException(
+ "A predicate on a DATE column requires a java.sql.Date literal.");
+ }
+ case TIMESTAMP:
+ if (literal instanceof Timestamp) {
+ return literal;
+ } else {
+ throw new IllegalArgumentException(
+ "A predicate on a TIMESTAMP column requires a java.sql.Timestamp literal.");
+ }
+ case DECIMAL:
+ if (literal instanceof BigDecimal) {
+ return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) literal));
+ } else {
+ throw new IllegalArgumentException(
+ "A predicate on a DECIMAL column requires a BigDecimal literal.");
+ }
+ default:
+ throw new IllegalArgumentException("Unknown literal type " + literalType);
+ }
+ }
+ }
+
+ abstract static class BinaryPredicate extends ColumnPredicate {
+ final Serializable literal;
+
+ BinaryPredicate(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+ super(columnName, literalType);
+ this.literal = literal;
+ }
+ }
+
+ /** An EQUALS predicate that can be evaluated by the OrcInputFormat. */
+ public static class Equals extends BinaryPredicate {
+ /**
+ * Creates an EQUALS predicate.
+ *
+ * @param columnName The column to check.
+ * @param literalType The type of the literal.
+ * @param literal The literal value to check the column against.
+ */
+ public Equals(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+ super(columnName, literalType, literal);
+ }
+
+ @Override
+ public SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return builder.equals(columnName, literalType, castLiteral(literal));
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " = " + literal;
+ }
+ }
+
+ /** A LESS_THAN predicate that can be evaluated by the OrcInputFormat. */
+ public static class LessThan extends BinaryPredicate {
+ /**
+ * Creates a LESS_THAN predicate.
+ *
+ * @param columnName The column to check.
+ * @param literalType The type of the literal.
+ * @param literal The literal value to check the column against.
+ */
+ public LessThan(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+ super(columnName, literalType, literal);
+ }
+
+ @Override
+ public SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return builder.lessThan(columnName, literalType, castLiteral(literal));
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " < " + literal;
+ }
+ }
+
+ /** A LESS_THAN_EQUALS predicate that can be evaluated by the OrcInputFormat. */
+ public static class LessThanEquals extends BinaryPredicate {
+ /**
+ * Creates a LESS_THAN_EQUALS predicate.
+ *
+ * @param columnName The column to check.
+ * @param literalType The type of the literal.
+ * @param literal The literal value to check the column against.
+ */
+ public LessThanEquals(
+ String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+ super(columnName, literalType, literal);
+ }
+
+ @Override
+ public SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return builder.lessThanEquals(columnName, literalType, castLiteral(literal));
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " <= " + literal;
+ }
+ }
+
+ /** An IS_NULL predicate that can be evaluated by the OrcInputFormat. */
+ public static class IsNull extends ColumnPredicate {
+ /**
+ * Creates an IS_NULL predicate.
+ *
+ * @param columnName The column to check for null.
+ * @param literalType The type of the column to check for null.
+ */
+ public IsNull(String columnName, PredicateLeaf.Type literalType) {
+ super(columnName, literalType);
+ }
+
+ @Override
+ public SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return builder.isNull(columnName, literalType);
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " IS NULL";
+ }
+ }
+
+ /** An BETWEEN predicate that can be evaluated by the OrcInputFormat. */
+ public static class Between extends ColumnPredicate {
+ private final Serializable lowerBound;
+ private final Serializable upperBound;
+
+ /**
+ * Creates an BETWEEN predicate.
+ *
+ * @param columnName The column to check.
+ * @param literalType The type of the literals.
+ * @param lowerBound The literal value of the (inclusive) lower bound to check the column
+ * against.
+ * @param upperBound The literal value of the (inclusive) upper bound to check the column
+ * against.
+ */
+ public Between(
+ String columnName,
+ PredicateLeaf.Type literalType,
+ Serializable lowerBound,
+ Serializable upperBound) {
+ super(columnName, literalType);
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ }
+
+ @Override
+ public SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return builder.between(
+ columnName, literalType, castLiteral(lowerBound), castLiteral(upperBound));
+ }
+
+ @Override
+ public String toString() {
+ return lowerBound + " <= " + columnName + " <= " + upperBound;
+ }
+ }
+
+ /** An IN predicate that can be evaluated by the OrcInputFormat. */
+ public static class In extends ColumnPredicate {
+ private final Serializable[] literals;
+
+ /**
+ * Creates an IN predicate.
+ *
+ * @param columnName The column to check.
+ * @param literalType The type of the literals.
+ * @param literals The literal values to check the column against.
+ */
+ public In(String columnName, PredicateLeaf.Type literalType, Serializable... literals) {
+ super(columnName, literalType);
+ this.literals = literals;
+ }
+
+ @Override
+ public SearchArgument.Builder add(SearchArgument.Builder builder) {
+ Object[] castedLiterals = new Object[literals.length];
+ for (int i = 0; i < literals.length; i++) {
+ castedLiterals[i] = castLiteral(literals[i]);
+ }
+ return builder.in(columnName, literalType, castedLiterals);
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " IN " + Arrays.toString(literals);
+ }
+ }
+
+ /** A NOT predicate to negate a predicate that can be evaluated by the OrcInputFormat. */
+ public static class Not extends Predicate {
+ private final Predicate pred;
+
+ /**
+ * Creates a NOT predicate.
+ *
+ * @param predicate The predicate to negate.
+ */
+ public Not(Predicate predicate) {
+ this.pred = predicate;
+ }
+
+ public SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return pred.add(builder.startNot()).end();
+ }
+
+ @Override
+ public String toString() {
+ return "NOT(" + pred.toString() + ")";
+ }
+ }
+
+ /** An OR predicate that can be evaluated by the OrcInputFormat. */
+ public static class Or extends Predicate {
+ private final Predicate[] preds;
+
+ /**
+ * Creates an OR predicate.
+ *
+ * @param predicates The disjunctive predicates.
+ */
+ public Or(Predicate... predicates) {
+ this.preds = predicates;
+ }
+
+ @Override
+ public SearchArgument.Builder add(SearchArgument.Builder builder) {
+ SearchArgument.Builder withOr = builder.startOr();
+ for (Predicate p : preds) {
+ withOr = p.add(withOr);
+ }
+ return withOr.end();
+ }
+
+ @Override
+ public String toString() {
+ return "OR(" + Arrays.toString(preds) + ")";
+ }
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcPredicateFunctionVisitor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java
similarity index 98%
rename from flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcPredicateFunctionVisitor.java
rename to flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java
index 8e93a03c..ee29d5e2 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcPredicateFunctionVisitor.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.format.orc;
+package org.apache.flink.table.store.format.orc.filter;
-import org.apache.flink.orc.OrcFilters;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.store.file.predicate.FieldRef;
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/AbstractOrcColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/AbstractOrcColumnVector.java
new file mode 100644
index 00000000..61e771be
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/AbstractOrcColumnVector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+
+/** This column vector is used to adapt hive's ColumnVector to Flink's ColumnVector. */
+public abstract class AbstractOrcColumnVector
+ implements org.apache.flink.table.store.data.columnar.ColumnVector {
+
+ private final ColumnVector vector;
+
+ AbstractOrcColumnVector(ColumnVector vector) {
+ this.vector = vector;
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return !vector.noNulls && vector.isNull[vector.isRepeating ? 0 : i];
+ }
+
+ public static org.apache.flink.table.store.data.columnar.ColumnVector createFlinkVector(
+ ColumnVector vector, LogicalType logicalType) {
+ if (vector instanceof LongColumnVector) {
+ if (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
+ return new OrcLegacyTimestampColumnVector((LongColumnVector) vector);
+ } else {
+ return new OrcLongColumnVector((LongColumnVector) vector);
+ }
+ } else if (vector instanceof DoubleColumnVector) {
+ return new OrcDoubleColumnVector((DoubleColumnVector) vector);
+ } else if (vector instanceof BytesColumnVector) {
+ return new OrcBytesColumnVector((BytesColumnVector) vector);
+ } else if (vector instanceof DecimalColumnVector) {
+ return new OrcDecimalColumnVector((DecimalColumnVector) vector);
+ } else if (vector instanceof TimestampColumnVector) {
+ return new OrcTimestampColumnVector(vector);
+ } else if (vector instanceof ListColumnVector) {
+ return new OrcArrayColumnVector((ListColumnVector) vector, (ArrayType) logicalType);
+ } else if (vector instanceof StructColumnVector) {
+ return new OrcRowColumnVector((StructColumnVector) vector, (RowType) logicalType);
+ } else if (vector instanceof MapColumnVector) {
+ return new OrcMapColumnVector((MapColumnVector) vector, (MapType) logicalType);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported vector: " + vector.getClass().getName());
+ }
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcArrayColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcArrayColumnVector.java
new file mode 100644
index 00000000..dacc8677
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcArrayColumnVector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.store.data.columnar.ColumnVector;
+import org.apache.flink.table.store.data.columnar.ColumnarArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+
+/** This column vector is used to adapt hive's ListColumnVector to Flink's ArrayColumnVector. */
+public class OrcArrayColumnVector extends AbstractOrcColumnVector
+ implements org.apache.flink.table.store.data.columnar.ArrayColumnVector {
+
+ private final ListColumnVector hiveVector;
+ private final ColumnVector flinkVector;
+
+ public OrcArrayColumnVector(ListColumnVector hiveVector, ArrayType type) {
+ super(hiveVector);
+ this.hiveVector = hiveVector;
+ this.flinkVector = createFlinkVector(hiveVector.child, type.getElementType());
+ }
+
+ @Override
+ public ArrayData getArray(int i) {
+ long offset = hiveVector.offsets[i];
+ long length = hiveVector.lengths[i];
+ return new ColumnarArrayData(flinkVector, (int) offset, (int) length);
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcBytesColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcBytesColumnVector.java
new file mode 100644
index 00000000..a2ef3ecf
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcBytesColumnVector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+
+/** This column vector is used to adapt hive's BytesColumnVector to Flink's BytesColumnVector. */
+public class OrcBytesColumnVector extends AbstractOrcColumnVector
+ implements org.apache.flink.table.store.data.columnar.BytesColumnVector {
+
+ private final BytesColumnVector vector;
+
+ public OrcBytesColumnVector(BytesColumnVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public Bytes getBytes(int i) {
+ int rowId = vector.isRepeating ? 0 : i;
+ byte[][] data = vector.vector;
+ int[] start = vector.start;
+ int[] length = vector.length;
+ return new Bytes(data[rowId], start[rowId], length[rowId]);
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcDecimalColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcDecimalColumnVector.java
new file mode 100644
index 00000000..4c8e539d
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcDecimalColumnVector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.data.DecimalData;
+
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+
+import java.math.BigDecimal;
+
+/**
+ * This column vector is used to adapt hive's DecimalColumnVector to Flink's DecimalColumnVector.
+ */
+public class OrcDecimalColumnVector extends AbstractOrcColumnVector
+ implements org.apache.flink.table.store.data.columnar.DecimalColumnVector {
+
+ private final DecimalColumnVector vector;
+
+ public OrcDecimalColumnVector(DecimalColumnVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ BigDecimal data =
+ vector.vector[vector.isRepeating ? 0 : i].getHiveDecimal().bigDecimalValue();
+ return DecimalData.fromBigDecimal(data, precision, scale);
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcDoubleColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcDoubleColumnVector.java
new file mode 100644
index 00000000..69c335b6
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcDoubleColumnVector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+
+/**
+ * This column vector is used to adapt hive's DoubleColumnVector to Flink's float and double
+ * ColumnVector.
+ */
+public class OrcDoubleColumnVector extends AbstractOrcColumnVector
+ implements org.apache.flink.table.store.data.columnar.DoubleColumnVector,
+ org.apache.flink.table.store.data.columnar.FloatColumnVector {
+
+ private final DoubleColumnVector vector;
+
+ public OrcDoubleColumnVector(DoubleColumnVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public double getDouble(int i) {
+ return vector.vector[vector.isRepeating ? 0 : i];
+ }
+
+ @Override
+ public float getFloat(int i) {
+ return (float) vector.vector[vector.isRepeating ? 0 : i];
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcLegacyTimestampColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcLegacyTimestampColumnVector.java
new file mode 100644
index 00000000..1b677d2e
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcLegacyTimestampColumnVector.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.data.TimestampData;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+/**
+ * This class is used to adapt to Hive's legacy (2.0.x) timestamp column vector which is a
+ * LongColumnVector.
+ */
+public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector
+ implements org.apache.flink.table.store.data.columnar.TimestampColumnVector {
+
+ private final LongColumnVector hiveVector;
+
+ OrcLegacyTimestampColumnVector(LongColumnVector vector) {
+ super(vector);
+ this.hiveVector = vector;
+ }
+
+ @Override
+ public TimestampData getTimestamp(int i, int precision) {
+ int index = hiveVector.isRepeating ? 0 : i;
+ Timestamp timestamp = toTimestamp(hiveVector.vector[index]);
+ return TimestampData.fromTimestamp(timestamp);
+ }
+
+ // creates a Hive ColumnVector of constant timestamp value
+ public static ColumnVector createFromConstant(int batchSize, Object value) {
+ LongColumnVector res = new LongColumnVector(batchSize);
+ if (value == null) {
+ res.noNulls = false;
+ res.isNull[0] = true;
+ res.isRepeating = true;
+ } else {
+ Timestamp timestamp =
+ value instanceof LocalDateTime
+ ? Timestamp.valueOf((LocalDateTime) value)
+ : (Timestamp) value;
+ res.fill(fromTimestamp(timestamp));
+ res.isNull[0] = false;
+ }
+ return res;
+ }
+
+ // converting from/to Timestamp is copied from Hive 2.0.0 TimestampUtils
+ private static long fromTimestamp(Timestamp timestamp) {
+ long time = timestamp.getTime();
+ int nanos = timestamp.getNanos();
+ return (time * 1000000) + (nanos % 1000000);
+ }
+
+ private static Timestamp toTimestamp(long timeInNanoSec) {
+ long integralSecInMillis =
+ (timeInNanoSec / 1000000000) * 1000; // Full seconds converted to millis.
+ long nanos = timeInNanoSec % 1000000000; // The nanoseconds.
+ if (nanos < 0) {
+ nanos =
+ 1000000000
+ + nanos; // The positive nano-part that will be added to milliseconds.
+ integralSecInMillis =
+ ((timeInNanoSec / 1000000000) - 1) * 1000; // Reduce by one second.
+ }
+ Timestamp res = new Timestamp(0);
+ res.setTime(integralSecInMillis);
+ res.setNanos((int) nanos);
+ return res;
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcLongColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcLongColumnVector.java
new file mode 100644
index 00000000..27b9662f
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcLongColumnVector.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+/**
+ * This column vector is used to adapt hive's LongColumnVector to Flink's boolean, byte, short, int
+ * and long ColumnVector.
+ */
+public class OrcLongColumnVector extends AbstractOrcColumnVector
+ implements org.apache.flink.table.store.data.columnar.LongColumnVector,
+ org.apache.flink.table.store.data.columnar.BooleanColumnVector,
+ org.apache.flink.table.store.data.columnar.ByteColumnVector,
+ org.apache.flink.table.store.data.columnar.ShortColumnVector,
+ org.apache.flink.table.store.data.columnar.IntColumnVector {
+
+ private final LongColumnVector vector;
+
+ public OrcLongColumnVector(LongColumnVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public long getLong(int i) {
+ return vector.vector[vector.isRepeating ? 0 : i];
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ return vector.vector[vector.isRepeating ? 0 : i] == 1;
+ }
+
+ @Override
+ public byte getByte(int i) {
+ return (byte) vector.vector[vector.isRepeating ? 0 : i];
+ }
+
+ @Override
+ public int getInt(int i) {
+ return (int) vector.vector[vector.isRepeating ? 0 : i];
+ }
+
+ @Override
+ public short getShort(int i) {
+ return (short) vector.vector[vector.isRepeating ? 0 : i];
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcMapColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcMapColumnVector.java
new file mode 100644
index 00000000..a62fe9ce
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcMapColumnVector.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.store.data.columnar.ColumnVector;
+import org.apache.flink.table.store.data.columnar.ColumnarMapData;
+import org.apache.flink.table.types.logical.MapType;
+
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+
+/** This column vector is used to adapt hive's MapColumnVector to Flink's MapColumnVector. */
+public class OrcMapColumnVector extends AbstractOrcColumnVector
+ implements org.apache.flink.table.store.data.columnar.MapColumnVector {
+
+ private final MapColumnVector hiveVector;
+ private final ColumnVector keyFlinkVector;
+ private final ColumnVector valueFlinkVector;
+
+ public OrcMapColumnVector(MapColumnVector hiveVector, MapType type) {
+ super(hiveVector);
+ this.hiveVector = hiveVector;
+ this.keyFlinkVector = createFlinkVector(hiveVector.keys, type.getKeyType());
+ this.valueFlinkVector = createFlinkVector(hiveVector.values, type.getValueType());
+ }
+
+ @Override
+ public MapData getMap(int i) {
+ long offset = hiveVector.offsets[i];
+ long length = hiveVector.lengths[i];
+ return new ColumnarMapData(keyFlinkVector, valueFlinkVector, (int) offset, (int) length);
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcRowColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcRowColumnVector.java
new file mode 100644
index 00000000..12c55e57
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcRowColumnVector.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.store.data.columnar.ColumnVector;
+import org.apache.flink.table.store.data.columnar.ColumnarRowData;
+import org.apache.flink.table.store.data.columnar.VectorizedColumnBatch;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+
+/** This column vector is used to adapt hive's StructColumnVector to Flink's RowColumnVector. */
+public class OrcRowColumnVector extends AbstractOrcColumnVector
+ implements org.apache.flink.table.store.data.columnar.RowColumnVector {
+
+ private final ColumnarRowData columnarRowData;
+
+ public OrcRowColumnVector(StructColumnVector hiveVector, RowType type) {
+ super(hiveVector);
+ int len = hiveVector.fields.length;
+ ColumnVector[] flinkVectors = new ColumnVector[len];
+ for (int i = 0; i < len; i++) {
+ flinkVectors[i] = createFlinkVector(hiveVector.fields[i], type.getTypeAt(i));
+ }
+ this.columnarRowData = new ColumnarRowData(new VectorizedColumnBatch(flinkVectors));
+ }
+
+ @Override
+ public ColumnarRowData getRow(int i) {
+ this.columnarRowData.setRowId(i);
+ return this.columnarRowData;
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcSplitReaderUtil.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcSplitReaderUtil.java
new file mode 100644
index 00000000..a06388a5
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcSplitReaderUtil.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.apache.orc.TypeDescription;
+
+/** Util for orc types. */
+public class OrcSplitReaderUtil {
+
+ /** See {@code org.apache.flink.table.catalog.hive.util.HiveTypeUtil}. */
+ public static TypeDescription logicalTypeToOrcType(LogicalType type) {
+ type = type.copy(true);
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ return TypeDescription.createChar().withMaxLength(((CharType) type).getLength());
+ case VARCHAR:
+ int len = ((VarCharType) type).getLength();
+ if (len == VarCharType.MAX_LENGTH) {
+ return TypeDescription.createString();
+ } else {
+ return TypeDescription.createVarchar().withMaxLength(len);
+ }
+ case BOOLEAN:
+ return TypeDescription.createBoolean();
+ case VARBINARY:
+ if (type.equals(DataTypes.BYTES().getLogicalType())) {
+ return TypeDescription.createBinary();
+ } else {
+ throw new UnsupportedOperationException(
+ "Not support other binary type: " + type);
+ }
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ return TypeDescription.createDecimal()
+ .withScale(decimalType.getScale())
+ .withPrecision(decimalType.getPrecision());
+ case TINYINT:
+ return TypeDescription.createByte();
+ case SMALLINT:
+ return TypeDescription.createShort();
+ case INTEGER:
+ return TypeDescription.createInt();
+ case BIGINT:
+ return TypeDescription.createLong();
+ case FLOAT:
+ return TypeDescription.createFloat();
+ case DOUBLE:
+ return TypeDescription.createDouble();
+ case DATE:
+ return TypeDescription.createDate();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return TypeDescription.createTimestamp();
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) type;
+ return TypeDescription.createList(logicalTypeToOrcType(arrayType.getElementType()));
+ case MAP:
+ MapType mapType = (MapType) type;
+ return TypeDescription.createMap(
+ logicalTypeToOrcType(mapType.getKeyType()),
+ logicalTypeToOrcType(mapType.getValueType()));
+ case ROW:
+ RowType rowType = (RowType) type;
+ TypeDescription struct = TypeDescription.createStruct();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ struct.addField(
+ rowType.getFieldNames().get(i),
+ logicalTypeToOrcType(rowType.getChildren().get(i)));
+ }
+ return struct;
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcTimestampColumnVector.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcTimestampColumnVector.java
new file mode 100644
index 00000000..0dbba966
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/reader/OrcTimestampColumnVector.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.reader;
+
+import org.apache.flink.table.data.TimestampData;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+
+import java.sql.Timestamp;
+
+/**
+ * This column vector is used to adapt hive's TimestampColumnVector to Flink's
+ * TimestampColumnVector.
+ */
+public class OrcTimestampColumnVector extends AbstractOrcColumnVector
+ implements org.apache.flink.table.store.data.columnar.TimestampColumnVector {
+
+ private final TimestampColumnVector vector;
+
+ public OrcTimestampColumnVector(ColumnVector vector) {
+ super(vector);
+ this.vector = (TimestampColumnVector) vector;
+ }
+
+ @Override
+ public TimestampData getTimestamp(int i, int precision) {
+ int index = vector.isRepeating ? 0 : i;
+ Timestamp timestamp = new Timestamp(vector.time[index]);
+ timestamp.setNanos(vector.nanos[index]);
+ return TimestampData.fromTimestamp(timestamp);
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/OrcBulkWriter.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/OrcBulkWriter.java
new file mode 100644
index 00000000..e3a80a3b
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/OrcBulkWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link BulkWriter} implementation that writes data in ORC format.
+ *
+ * @param <T> The type of element written.
+ */
+@Internal
+public class OrcBulkWriter<T> implements BulkWriter<T> {
+
+ private final Writer writer;
+ private final Vectorizer<T> vectorizer;
+ private final VectorizedRowBatch rowBatch;
+
+ public OrcBulkWriter(Vectorizer<T> vectorizer, Writer writer) {
+ this.vectorizer = checkNotNull(vectorizer);
+ this.writer = checkNotNull(writer);
+ this.rowBatch = vectorizer.getSchema().createRowBatch();
+
+ // Configure the vectorizer with the writer so that users can add
+ // metadata on the fly through the Vectorizer#vectorize(...) method.
+ this.vectorizer.setWriter(this.writer);
+ }
+
+ @Override
+ public void addElement(T element) throws IOException {
+ vectorizer.vectorize(element, rowBatch);
+ if (rowBatch.size == rowBatch.getMaxSize()) {
+ writer.addRowBatch(rowBatch);
+ rowBatch.reset();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (rowBatch.size != 0) {
+ writer.addRowBatch(rowBatch);
+ rowBatch.reset();
+ }
+ }
+
+ @Override
+ public void finish() throws IOException {
+ flush();
+ writer.close();
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/PhysicalWriterImpl.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/PhysicalWriterImpl.java
new file mode 100644
index 00000000..03608ced
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/PhysicalWriterImpl.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import com.google.protobuf.CodedOutputStream;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.HadoopShims;
+import org.apache.orc.impl.OrcCodecPool;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.StreamName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize;
+
+/**
+ * A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}.
+ *
+ * <p>Whereas PhysicalFsWriter implementation works on the basis of a Path, this implementation
+ * leverages Flink's {@link FSDataOutputStream} to write the compressed data.
+ *
+ * <p>NOTE: If the ORC dependency version is updated, this file may have to be updated as well to be
+ * in sync with the new version's PhysicalFsWriter.
+ */
+@Internal
+public class PhysicalWriterImpl implements PhysicalWriter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PhysicalWriterImpl.class);
+ private static final byte[] ZEROS = new byte[64 * 1024];
+ private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+
+ protected final OutStream writer;
+ private final CodedOutputStream protobufWriter;
+ private final CompressionKind compress;
+ private final Map<StreamName, BufferedStream> streams;
+ private final HadoopShims shims;
+ private final int maxPadding;
+ private final int bufferSize;
+ private final long blockSize;
+ private final boolean addBlockPadding;
+ private final boolean writeVariableLengthBlocks;
+
+ private CompressionCodec codec;
+ private FSDataOutputStream out;
+ private long headerLength;
+ private long stripeStart;
+ private long blockOffset;
+ private int metadataLength;
+ private int footerLength;
+
+ public PhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts)
+ throws IOException {
+ if (opts.isEnforceBufferSize()) {
+ this.bufferSize = opts.getBufferSize();
+ } else {
+ this.bufferSize =
+ getEstimatedBufferSize(
+ opts.getStripeSize(),
+ opts.getSchema().getMaximumId() + 1,
+ opts.getBufferSize());
+ }
+
+ this.out = out;
+ this.blockOffset = 0;
+ this.blockSize = opts.getBlockSize();
+ this.maxPadding = (int) (opts.getPaddingTolerance() * (double) opts.getBufferSize());
+ this.compress = opts.getCompress();
+ this.codec = OrcCodecPool.getCodec(this.compress);
+ this.streams = new TreeMap<>();
+ this.writer =
+ new OutStream("metadata", this.bufferSize, this.codec, new DirectStream(this.out));
+ this.shims = opts.getHadoopShims();
+ this.addBlockPadding = opts.getBlockPadding();
+ this.protobufWriter = CodedOutputStream.newInstance(this.writer);
+ this.writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
+ }
+
+ @Override
+ public void writeHeader() throws IOException {
+ this.out.write("ORC".getBytes());
+ this.headerLength = this.out.getPos();
+ }
+
+ @Override
+ public OutputReceiver createDataStream(StreamName name) throws IOException {
+ BufferedStream result = streams.get(name);
+
+ if (result == null) {
+ result = new BufferedStream();
+ streams.put(name, result);
+ }
+
+ return result;
+ }
+
+ @Override
+ public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index, CompressionCodec codec)
+ throws IOException {
+ OutputStream stream =
+ new OutStream(this.toString(), bufferSize, codec, createDataStream(name));
+ index.build().writeTo(stream);
+ stream.flush();
+ }
+
+ @Override
+ public void writeBloomFilter(
+ StreamName name, OrcProto.BloomFilterIndex.Builder bloom, CompressionCodec codec)
+ throws IOException {
+ OutputStream stream =
+ new OutStream(this.toString(), bufferSize, codec, createDataStream(name));
+ bloom.build().writeTo(stream);
+ stream.flush();
+ }
+
+ @Override
+ public void finalizeStripe(
+ OrcProto.StripeFooter.Builder footerBuilder,
+ OrcProto.StripeInformation.Builder dirEntry)
+ throws IOException {
+ long indexSize = 0;
+ long dataSize = 0;
+
+ for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
+ BufferedStream receiver = pair.getValue();
+ if (!receiver.isSuppressed) {
+ long streamSize = receiver.getOutputSize();
+ StreamName name = pair.getKey();
+ footerBuilder.addStreams(
+ OrcProto.Stream.newBuilder()
+ .setColumn(name.getColumn())
+ .setKind(name.getKind())
+ .setLength(streamSize));
+ if (StreamName.Area.INDEX == name.getArea()) {
+ indexSize += streamSize;
+ } else {
+ dataSize += streamSize;
+ }
+ }
+ }
+
+ dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
+ OrcProto.StripeFooter footer = footerBuilder.build();
+ // Do we need to pad the file so the stripe doesn't straddle a block boundary?
+ padStripe(indexSize + dataSize + footer.getSerializedSize());
+
+ // write out the data streams
+ for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
+ pair.getValue().spillToDiskAndClear(out);
+ }
+
+ // Write out the footer.
+ writeStripeFooter(footer, dataSize, indexSize, dirEntry);
+ }
+
+ @Override
+ public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException {
+ long startPosition = out.getPos();
+ OrcProto.Metadata metadata = builder.build();
+ writeMetadata(metadata);
+ this.metadataLength = (int) (out.getPos() - startPosition);
+ }
+
+ @Override
+ public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException {
+ long bodyLength = out.getPos() - metadataLength;
+ builder.setContentLength(bodyLength);
+ builder.setHeaderLength(headerLength);
+ long startPosition = out.getPos();
+ OrcProto.Footer footer = builder.build();
+ writeFileFooter(footer);
+ this.footerLength = (int) (out.getPos() - startPosition);
+ }
+
+ @Override
+ public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
+ builder.setFooterLength(footerLength);
+ builder.setMetadataLength(metadataLength);
+
+ OrcProto.PostScript ps = builder.build();
+ // need to write this uncompressed
+ long startPosition = out.getPos();
+ ps.writeTo(out);
+ long length = out.getPos() - startPosition;
+
+ if (length > 255) {
+ throw new IllegalArgumentException("PostScript too large at " + length);
+ }
+
+ out.write((int) length);
+ return out.getPos();
+ }
+
+ @Override
+ public void close() {
+ // Just release the codec but don't close the internal stream here to avoid
+ // Stream Closed or ClosedChannelException when Flink performs checkpoint.
+ OrcCodecPool.returnCodec(compress, codec);
+ codec = null;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void appendRawStripe(ByteBuffer buffer, OrcProto.StripeInformation.Builder dirEntry)
+ throws IOException {
+ long start = out.getPos();
+ int length = buffer.remaining();
+ long availBlockSpace = blockSize - (start % blockSize);
+
+ // see if stripe can fit in the current hdfs block, else pad the remaining
+ // space in the block
+ if (length < blockSize && length > availBlockSpace && addBlockPadding) {
+ byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
+ LOG.info(String.format("Padding ORC by %d bytes while merging..", availBlockSpace));
+ start += availBlockSpace;
+ while (availBlockSpace > 0) {
+ int writeLen = (int) Math.min(availBlockSpace, pad.length);
+ out.write(pad, 0, writeLen);
+ availBlockSpace -= writeLen;
+ }
+ }
+
+ out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), length);
+ dirEntry.setOffset(start);
+ }
+
+ @Override
+ public CompressionCodec getCompressionCodec() {
+ return this.codec;
+ }
+
+ @Override
+ public long getFileBytes(int column) {
+ long size = 0;
+
+ for (final Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
+ final BufferedStream receiver = pair.getValue();
+ if (!receiver.isSuppressed) {
+
+ final StreamName name = pair.getKey();
+ if (name.getColumn() == column && name.getArea() != StreamName.Area.INDEX) {
+ size += receiver.getOutputSize();
+ }
+ }
+ }
+
+ return size;
+ }
+
+ private void padStripe(long stripeSize) throws IOException {
+ this.stripeStart = out.getPos();
+ long previousBytesInBlock = (stripeStart - blockOffset) % blockSize;
+
+ // We only have options if this isn't the first stripe in the block
+ if (previousBytesInBlock > 0) {
+ if (previousBytesInBlock + stripeSize >= blockSize) {
+ // Try making a short block
+ if (writeVariableLengthBlocks && shims.endVariableLengthBlock(out)) {
+ blockOffset = stripeStart;
+ } else if (addBlockPadding) {
+ // if we cross the block boundary, figure out what we should do
+ long padding = blockSize - previousBytesInBlock;
+ if (padding <= maxPadding) {
+ writeZeros(out, padding);
+ stripeStart += padding;
+ }
+ }
+ }
+ }
+ }
+
+ private void writeStripeFooter(
+ OrcProto.StripeFooter footer,
+ long dataSize,
+ long indexSize,
+ OrcProto.StripeInformation.Builder dirEntry)
+ throws IOException {
+ writeStripeFooter(footer);
+
+ dirEntry.setOffset(stripeStart);
+ dirEntry.setFooterLength(out.getPos() - stripeStart - dataSize - indexSize);
+ }
+
+ protected void writeMetadata(OrcProto.Metadata metadata) throws IOException {
+ metadata.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ }
+
+ protected void writeFileFooter(OrcProto.Footer footer) throws IOException {
+ footer.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ }
+
+ protected void writeStripeFooter(OrcProto.StripeFooter footer) throws IOException {
+ footer.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ }
+
+ private static void writeZeros(OutputStream output, long remaining) throws IOException {
+ while (remaining > 0) {
+ long size = Math.min(ZEROS.length, remaining);
+ output.write(ZEROS, 0, (int) size);
+ remaining -= size;
+ }
+ }
+
+ private static class DirectStream implements OutputReceiver {
+ private final FSDataOutputStream output;
+
+ DirectStream(FSDataOutputStream output) {
+ this.output = output;
+ }
+
+ public void output(ByteBuffer buffer) throws IOException {
+ this.output.write(
+ buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+ }
+
+ public void suppress() {
+ throw new UnsupportedOperationException("Can't suppress direct stream");
+ }
+ }
+
+ private static final class BufferedStream implements OutputReceiver {
+ private boolean isSuppressed = false;
+ private final List<ByteBuffer> output = new ArrayList<>();
+
+ @Override
+ public void output(ByteBuffer buffer) {
+ if (!isSuppressed) {
+ output.add(buffer);
+ }
+ }
+
+ public void suppress() {
+ isSuppressed = true;
+ output.clear();
+ }
+
+ void spillToDiskAndClear(FSDataOutputStream raw) throws IOException {
+ if (!isSuppressed) {
+ for (ByteBuffer buffer : output) {
+ raw.write(
+ buffer.array(),
+ buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ output.clear();
+ }
+ isSuppressed = false;
+ }
+
+ public long getOutputSize() {
+ long result = 0;
+ for (ByteBuffer buffer : output) {
+ result += buffer.remaining();
+ }
+ return result;
+ }
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/RowDataVectorizer.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/RowDataVectorizer.java
new file mode 100644
index 00000000..6e3f32dc
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/RowDataVectorizer.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.writer;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import java.sql.Timestamp;
+
+/** A {@link Vectorizer} of {@link RowData} type element. */
+public class RowDataVectorizer extends Vectorizer<RowData> {
+
+ private final LogicalType[] fieldTypes;
+
+ public RowDataVectorizer(String schema, LogicalType[] fieldTypes) {
+ super(schema);
+ this.fieldTypes = fieldTypes;
+ }
+
+ @Override
+ public void vectorize(RowData row, VectorizedRowBatch batch) {
+ int rowId = batch.size++;
+ for (int i = 0; i < row.getArity(); ++i) {
+ setColumn(rowId, batch.cols[i], fieldTypes[i], row, i);
+ }
+ }
+
+ private static void setColumn(
+ int rowId, ColumnVector column, LogicalType type, RowData row, int columnId) {
+ if (row.isNullAt(columnId)) {
+ column.noNulls = false;
+ column.isNull[rowId] = true;
+ return;
+ }
+
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ byte[] bytes = row.getString(columnId).toBytes();
+ vector.setVal(rowId, bytes, 0, bytes.length);
+ break;
+ }
+ case BOOLEAN:
+ {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = row.getBoolean(columnId) ? 1 : 0;
+ break;
+ }
+ case BINARY:
+ case VARBINARY:
+ {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ byte[] bytes = row.getBinary(columnId);
+ vector.setVal(rowId, bytes, 0, bytes.length);
+ break;
+ }
+ case DECIMAL:
+ {
+ DecimalType dt = (DecimalType) type;
+ DecimalColumnVector vector = (DecimalColumnVector) column;
+ vector.set(
+ rowId,
+ HiveDecimal.create(
+ row.getDecimal(columnId, dt.getPrecision(), dt.getScale())
+ .toBigDecimal()));
+ break;
+ }
+ case TINYINT:
+ {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = row.getByte(columnId);
+ break;
+ }
+ case SMALLINT:
+ {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = row.getShort(columnId);
+ break;
+ }
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case INTEGER:
+ {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = row.getInt(columnId);
+ break;
+ }
+ case BIGINT:
+ {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = row.getLong(columnId);
+ break;
+ }
+ case FLOAT:
+ {
+ DoubleColumnVector vector = (DoubleColumnVector) column;
+ vector.vector[rowId] = row.getFloat(columnId);
+ break;
+ }
+ case DOUBLE:
+ {
+ DoubleColumnVector vector = (DoubleColumnVector) column;
+ vector.vector[rowId] = row.getDouble(columnId);
+ break;
+ }
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ {
+ TimestampType tt = (TimestampType) type;
+ Timestamp timestamp =
+ row.getTimestamp(columnId, tt.getPrecision()).toTimestamp();
+ TimestampColumnVector vector = (TimestampColumnVector) column;
+ vector.set(rowId, timestamp);
+ break;
+ }
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ {
+ LocalZonedTimestampType lt = (LocalZonedTimestampType) type;
+ Timestamp timestamp =
+ row.getTimestamp(columnId, lt.getPrecision()).toTimestamp();
+ TimestampColumnVector vector = (TimestampColumnVector) column;
+ vector.set(rowId, timestamp);
+ break;
+ }
+ case ARRAY:
+ {
+ ListColumnVector listColumnVector = (ListColumnVector) column;
+ setColumn(rowId, listColumnVector, type, row, columnId);
+ break;
+ }
+ case MAP:
+ {
+ MapColumnVector mapColumnVector = (MapColumnVector) column;
+ setColumn(rowId, mapColumnVector, type, row, columnId);
+ break;
+ }
+ case ROW:
+ {
+ StructColumnVector structColumnVector = (StructColumnVector) column;
+ setColumn(rowId, structColumnVector, type, row, columnId);
+ break;
+ }
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ private static void setColumn(
+ int rowId,
+ ListColumnVector listColumnVector,
+ LogicalType type,
+ RowData row,
+ int columnId) {
+ ArrayData arrayData = row.getArray(columnId);
+ ArrayType arrayType = (ArrayType) type;
+ listColumnVector.lengths[rowId] = arrayData.size();
+ listColumnVector.offsets[rowId] = listColumnVector.childCount;
+ listColumnVector.childCount += listColumnVector.lengths[rowId];
+ listColumnVector.child.ensureSize(
+ listColumnVector.childCount, listColumnVector.offsets[rowId] != 0);
+
+ RowData convertedRowData = convert(arrayData, arrayType.getElementType());
+ for (int i = 0; i < arrayData.size(); i++) {
+ setColumn(
+ (int) listColumnVector.offsets[rowId] + i,
+ listColumnVector.child,
+ arrayType.getElementType(),
+ convertedRowData,
+ i);
+ }
+ }
+
+ private static void setColumn(
+ int rowId,
+ MapColumnVector mapColumnVector,
+ LogicalType type,
+ RowData row,
+ int columnId) {
+ MapData mapData = row.getMap(columnId);
+ MapType mapType = (MapType) type;
+ ArrayData keyArray = mapData.keyArray();
+ ArrayData valueArray = mapData.valueArray();
+ mapColumnVector.lengths[rowId] = mapData.size();
+ mapColumnVector.offsets[rowId] = mapColumnVector.childCount;
+ mapColumnVector.childCount += mapColumnVector.lengths[rowId];
+ mapColumnVector.keys.ensureSize(
+ mapColumnVector.childCount, mapColumnVector.offsets[rowId] != 0);
+ mapColumnVector.values.ensureSize(
+ mapColumnVector.childCount, mapColumnVector.offsets[rowId] != 0);
+
+ RowData convertedKeyRowData = convert(keyArray, mapType.getKeyType());
+ RowData convertedValueRowData = convert(valueArray, mapType.getValueType());
+ for (int i = 0; i < keyArray.size(); i++) {
+ setColumn(
+ (int) mapColumnVector.offsets[rowId] + i,
+ mapColumnVector.keys,
+ mapType.getKeyType(),
+ convertedKeyRowData,
+ i);
+ setColumn(
+ (int) mapColumnVector.offsets[rowId] + i,
+ mapColumnVector.values,
+ mapType.getValueType(),
+ convertedValueRowData,
+ i);
+ }
+ }
+
+ private static void setColumn(
+ int rowId,
+ StructColumnVector structColumnVector,
+ LogicalType type,
+ RowData row,
+ int columnId) {
+ RowData structRow = row.getRow(columnId, structColumnVector.fields.length);
+ RowType rowType = (RowType) type;
+ for (int i = 0; i < structRow.getArity(); i++) {
+ ColumnVector cv = structColumnVector.fields[i];
+ setColumn(rowId, cv, rowType.getTypeAt(i), structRow, i);
+ }
+ }
+
+ /**
+ * Converting ArrayData to RowData for calling {@link RowDataVectorizer#setColumn(int,
+ * ColumnVector, LogicalType, RowData, int)} recursively with array.
+ *
+ * @param arrayData input ArrayData.
+ * @param arrayFieldType LogicalType of input ArrayData.
+ * @return RowData.
+ */
+ private static RowData convert(ArrayData arrayData, LogicalType arrayFieldType) {
+ GenericRowData rowData = new GenericRowData(arrayData.size());
+ ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(arrayFieldType);
+ for (int i = 0; i < arrayData.size(); i++) {
+ rowData.setField(i, elementGetter.getElementOrNull(arrayData, i));
+ }
+ return rowData;
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/Vectorizer.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/Vectorizer.java
new file mode 100644
index 00000000..dbddc16e
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/writer/Vectorizer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class provides an abstracted set of methods to handle the lifecycle of {@link
+ * VectorizedRowBatch}.
+ *
+ * <p>Users have to extend this class and override the vectorize() method with the logic to
+ * transform the element to a {@link VectorizedRowBatch}.
+ *
+ * @param <T> The type of the element
+ */
+@PublicEvolving
+public abstract class Vectorizer<T> implements Serializable {
+
+ private final TypeDescription schema;
+
+ private transient Writer writer;
+
+ public Vectorizer(final String schema) {
+ checkNotNull(schema);
+ this.schema = TypeDescription.fromString(schema);
+ }
+
+ /**
+ * Provides the ORC schema.
+ *
+ * @return the ORC schema
+ */
+ public TypeDescription getSchema() {
+ return this.schema;
+ }
+
+ /**
+ * Users are not supposed to use this method since this is intended to be used only by the
+ * {@link OrcBulkWriter}.
+ *
+ * @param writer the underlying ORC Writer.
+ */
+ public void setWriter(Writer writer) {
+ this.writer = writer;
+ }
+
+ /**
+ * Adds arbitrary user metadata to the outgoing ORC file.
+ *
+ * <p>Users who want to dynamically add new metadata either based on either the input or from an
+ * external system can do so by calling <code>addUserMetadata(...)</code> inside the overridden
+ * vectorize() method.
+ *
+ * @param key a key to label the data with.
+ * @param value the contents of the metadata.
+ */
+ public void addUserMetadata(String key, ByteBuffer value) {
+ this.writer.addUserMetadata(key, value);
+ }
+
+ /**
+ * Transforms the provided element to ColumnVectors and sets them in the exposed
+ * VectorizedRowBatch.
+ *
+ * @param element The input element
+ * @param batch The batch to write the ColumnVectors
+ * @throws IOException if there is an error while transforming the input.
+ */
+ public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
+}
diff --git a/flink-table-store-format/src/main/resources/META-INF/NOTICE b/flink-table-store-format/src/main/resources/META-INF/NOTICE
index 4103b2d0..89edd908 100644
--- a/flink-table-store-format/src/main/resources/META-INF/NOTICE
+++ b/flink-table-store-format/src/main/resources/META-INF/NOTICE
@@ -6,6 +6,12 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+- org.apache.avro:avro:1.11.1
+- com.fasterxml.jackson.core:jackson-core:2.13.4
+- com.fasterxml.jackson.core:jackson-databind:2.13.4.2
+- com.fasterxml.jackson.core:jackson-annotations:2.13.4
+- org.apache.commons:commons-compress:1.21
+
- org.apache.parquet:parquet-avro:1.12.3
- org.apache.parquet:parquet-hadoop:1.12.3
- org.apache.parquet:parquet-column:1.12.3
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/avro/AvroBulkFormatTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/avro/AvroBulkFormatTest.java
new file mode 100644
index 00000000..c6ef91de
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/avro/AvroBulkFormatTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.flink.table.store.format.avro.AvroBulkFormatTestUtils.ROW_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AbstractAvroBulkFormat}. */
+class AvroBulkFormatTest {
+
+ private static final List<RowData> TEST_DATA =
+ Arrays.asList(
+ // -------- batch 0, block start 232 --------
+ GenericRowData.of(
+ StringData.fromString("AvroBulk"), StringData.fromString("FormatTest")),
+ GenericRowData.of(
+ StringData.fromString("Apache"), StringData.fromString("Flink")),
+ GenericRowData.of(
+ StringData.fromString(
+ "永和九年,岁在癸丑,暮春之初,会于会稽山阴之兰亭,修禊事也。群贤毕至,少"
+ + "长咸集。此地有崇山峻岭,茂林修竹,又有清流激湍,映带左右。引"
+ + "以为流觞曲水,列坐其次。虽无丝竹管弦之盛,一觞一咏,亦足以畅"
+ + "叙幽情。"),
+ StringData.fromString("")),
+ // -------- batch 1, block start 593 --------
+ GenericRowData.of(
+ StringData.fromString("File"), StringData.fromString("Format")),
+ GenericRowData.of(
+ null,
+ StringData.fromString(
+ "This is a string with English, 中文 and even 🍎🍌🍑🥝🍍🥭🍐")),
+ // -------- batch 2, block start 705 --------
+ GenericRowData.of(
+ StringData.fromString("block with"),
+ StringData.fromString("only one record"))
+ // -------- file length 752 --------
+ );
+ private static final List<Long> BLOCK_STARTS = Arrays.asList(232L, 593L, 705L);
+
+ private File tmpFile;
+
+ @BeforeEach
+ public void before() throws IOException {
+ tmpFile = Files.createTempFile("avro-bulk-format-test", ".avro").toFile();
+ tmpFile.createNewFile();
+ FileOutputStream out = new FileOutputStream(tmpFile);
+
+ Schema schema = AvroSchemaConverter.convertToSchema(ROW_TYPE);
+ RowDataToAvroConverters.RowDataToAvroConverter converter =
+ RowDataToAvroConverters.createConverter(ROW_TYPE);
+
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
+ dataFileWriter.create(schema, out);
+
+ // Generate the sync points manually in order to test blocks.
+ long syncBlock1 = dataFileWriter.sync();
+ dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(0)));
+ dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(1)));
+ dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(2)));
+ long syncBlock2 = dataFileWriter.sync();
+ dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(3)));
+ dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(4)));
+ long syncBlock3 = dataFileWriter.sync();
+ dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(5)));
+ long syncEnd = dataFileWriter.sync();
+ dataFileWriter.close();
+
+ // These values should be constant if nothing else changes with the file.
+ assertThat(BLOCK_STARTS).isEqualTo(Arrays.asList(syncBlock1, syncBlock2, syncBlock3));
+ assertThat(tmpFile).hasSize(syncEnd);
+ }
+
+ @AfterEach
+ public void after() throws IOException {
+ FileUtils.deleteFileOrDirectory(tmpFile);
+ }
+
+ @Test
+ void testReadWholeFileWithOneSplit() throws IOException {
+ AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
+ new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
+ assertSplit(
+ bulkFormat,
+ Collections.singletonList(
+ new SplitInfo(
+ 0,
+ tmpFile.length(),
+ Arrays.asList(
+ new BatchInfo(0, 3),
+ new BatchInfo(3, 5),
+ new BatchInfo(5, 6)))));
+ }
+
+ @Test
+ void testReadWholeFileWithMultipleSplits() throws IOException {
+ AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
+ new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
+ long splitLength = tmpFile.length() / 3;
+ assertSplit(
+ bulkFormat,
+ Arrays.asList(
+ new SplitInfo(
+ 0, splitLength, Collections.singletonList(new BatchInfo(0, 3))),
+ new SplitInfo(splitLength, splitLength * 2, Collections.emptyList()),
+ new SplitInfo(
+ splitLength * 2,
+ tmpFile.length(),
+ Arrays.asList(new BatchInfo(3, 5), new BatchInfo(5, 6)))));
+ }
+
+ @Test
+ void testSplitsAtCriticalLocations() throws IOException {
+ AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
+ new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
+ assertSplit(
+ bulkFormat,
+ Arrays.asList(
+ // ends just before the new block
+ new SplitInfo(
+ BLOCK_STARTS.get(0) - DataFileConstants.SYNC_SIZE,
+ BLOCK_STARTS.get(1) - DataFileConstants.SYNC_SIZE,
+ Collections.singletonList(new BatchInfo(0, 3))),
+ // ends just at the beginning of new block
+ new SplitInfo(
+ BLOCK_STARTS.get(1) - DataFileConstants.SYNC_SIZE,
+ BLOCK_STARTS.get(2) - DataFileConstants.SYNC_SIZE + 1,
+ Arrays.asList(new BatchInfo(3, 5), new BatchInfo(5, 6)))));
+ }
+
+ @Test
+ void testRestoreReader() throws IOException {
+ AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
+ new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
+ long splitLength = tmpFile.length() / 3;
+ String splitId = UUID.randomUUID().toString();
+
+ FileSourceSplit split =
+ new FileSourceSplit(
+ splitId, new Path(tmpFile.toString()), splitLength * 2, tmpFile.length());
+ BulkFormat.Reader<RowData> reader = bulkFormat.createReader(new Configuration(), split);
+ long offset1 = assertBatch(reader, new BatchInfo(3, 5));
+ assertBatch(reader, new BatchInfo(5, 6));
+ assertThat(reader.readBatch()).isNull();
+ reader.close();
+
+ split =
+ new FileSourceSplit(
+ splitId,
+ new Path(tmpFile.toString()),
+ splitLength * 2,
+ tmpFile.length(),
+ StringUtils.EMPTY_STRING_ARRAY,
+ new CheckpointedPosition(offset1, 1));
+ reader = bulkFormat.restoreReader(new Configuration(), split);
+ long offset2 = assertBatch(reader, new BatchInfo(3, 5), 1);
+ assertBatch(reader, new BatchInfo(5, 6));
+ assertThat(reader.readBatch()).isNull();
+ reader.close();
+
+ assertThat(offset2).isEqualTo(offset1);
+ }
+
+ private void assertSplit(
+ AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat, List<SplitInfo> splitInfos)
+ throws IOException {
+ for (SplitInfo splitInfo : splitInfos) {
+ FileSourceSplit split =
+ new FileSourceSplit(
+ UUID.randomUUID().toString(),
+ new Path(tmpFile.toString()),
+ splitInfo.start,
+ splitInfo.end - splitInfo.start);
+ BulkFormat.Reader<RowData> reader = bulkFormat.createReader(new Configuration(), split);
+ List<Long> offsets = new ArrayList<>();
+ for (BatchInfo batch : splitInfo.batches) {
+ offsets.add(assertBatch(reader, batch));
+ }
+ assertThat(reader.readBatch()).isNull();
+ for (int j = 1; j < offsets.size(); j++) {
+ assertThat(offsets.get(j - 1) < offsets.get(j)).isTrue();
+ }
+ reader.close();
+ }
+ }
+
+ private long assertBatch(BulkFormat.Reader<RowData> reader, BatchInfo batchInfo)
+ throws IOException {
+ return assertBatch(reader, batchInfo, 0);
+ }
+
+ private long assertBatch(
+ BulkFormat.Reader<RowData> reader, BatchInfo batchInfo, int initialSkipCount)
+ throws IOException {
+ long ret = -1;
+ int skipCount = initialSkipCount;
+ BulkFormat.RecordIterator<RowData> iterator = reader.readBatch();
+ for (RecordAndPosition<RowData> recordAndPos = iterator.next();
+ recordAndPos != null;
+ recordAndPos = iterator.next()) {
+ if (ret == -1) {
+ ret = recordAndPos.getOffset();
+ }
+ assertThat(recordAndPos.getRecord())
+ .isEqualTo(TEST_DATA.get(batchInfo.start + skipCount));
+ assertThat(recordAndPos.getOffset()).isEqualTo(ret);
+ skipCount++;
+ assertThat(recordAndPos.getRecordSkipCount()).isEqualTo(skipCount);
+ }
+ assertThat(skipCount).isEqualTo(batchInfo.end - batchInfo.start);
+ iterator.releaseBatch();
+ return ret;
+ }
+
+ private static class SplitInfo {
+ private final long start;
+ private final long end;
+ private final List<BatchInfo> batches;
+
+ private SplitInfo(long start, long end, List<BatchInfo> batches) {
+ this.start = start;
+ this.end = end;
+ this.batches = batches;
+ }
+ }
+
+ private static class BatchInfo {
+ private final int start;
+ private final int end;
+
+ private BatchInfo(int start, int end) {
+ this.start = start;
+ this.end = end;
+ }
+ }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/avro/AvroBulkFormatTestUtils.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/avro/AvroBulkFormatTestUtils.java
new file mode 100644
index 00000000..57279a11
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/avro/AvroBulkFormatTestUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.avro;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.function.Function;
+
+/** Testing utils for tests related to {@link AbstractAvroBulkFormat}. */
+public class AvroBulkFormatTestUtils {
+
+ public static final RowType ROW_TYPE =
+ RowType.of(
+ false,
+ new LogicalType[] {
+ DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()
+ },
+ new String[] {"a", "b"});
+
+ /** {@link AbstractAvroBulkFormat} for tests. */
+ public static class TestingAvroBulkFormat
+ extends AbstractAvroBulkFormat<GenericRecord, RowData, FileSourceSplit> {
+
+ protected TestingAvroBulkFormat() {
+ super(AvroSchemaConverter.convertToSchema(ROW_TYPE));
+ }
+
+ @Override
+ protected GenericRecord createReusedAvroRecord() {
+ return new GenericData.Record(readerSchema);
+ }
+
+ @Override
+ protected Function<GenericRecord, RowData> createConverter() {
+ AvroToRowDataConverters.AvroToRowDataConverter converter =
+ AvroToRowDataConverters.createRowConverter(ROW_TYPE);
+ return record -> record == null ? null : (RowData) converter.convert(record);
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return InternalTypeInfo.of(ROW_TYPE);
+ }
+ }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterTestUtil.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterTestUtil.java
new file mode 100644
index 00000000..b1e1071b
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterTestUtil.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Util class for the OrcBulkWriter tests. */
+public class OrcBulkWriterTestUtil {
+
+ public static final String USER_METADATA_KEY = "userKey";
+ public static final ByteBuffer USER_METADATA_VALUE = ByteBuffer.wrap("hello".getBytes());
+
+ public static void validate(File files, List<Record> expected) throws IOException {
+ final File[] buckets = files.listFiles();
+ assertThat(buckets).isNotNull();
+ assertThat(buckets).hasSize(1);
+
+ final File[] partFiles = buckets[0].listFiles();
+ assertThat(partFiles).isNotNull();
+
+ for (File partFile : partFiles) {
+ assertThat(partFile.length()).isGreaterThan(0);
+
+ OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(new Configuration());
+ Reader reader =
+ OrcFile.createReader(
+ new org.apache.hadoop.fs.Path(partFile.toURI()), readerOptions);
+
+ assertThat(reader.getNumberOfRows()).isEqualTo(3);
+ assertThat(reader.getSchema().getFieldNames()).hasSize(2);
+ assertThat(reader.getCompressionKind()).isSameAs(CompressionKind.LZ4);
+ assertThat(reader.hasMetadataValue(USER_METADATA_KEY)).isTrue();
+ assertThat(reader.getMetadataKeys()).contains(USER_METADATA_KEY);
+
+ List<Record> results = getResults(reader);
+
+ assertThat(results).hasSize(3).isEqualTo(expected);
+ }
+ }
+
+ private static List<Record> getResults(Reader reader) throws IOException {
+ List<Record> results = new ArrayList<>();
+
+ RecordReader recordReader = reader.rows();
+ VectorizedRowBatch batch = reader.getSchema().createRowBatch();
+
+ while (recordReader.nextBatch(batch)) {
+ BytesColumnVector stringVector = (BytesColumnVector) batch.cols[0];
+ LongColumnVector intVector = (LongColumnVector) batch.cols[1];
+ for (int r = 0; r < batch.size; r++) {
+ String name =
+ new String(
+ stringVector.vector[r],
+ stringVector.start[r],
+ stringVector.length[r]);
+ int age = (int) intVector.vector[r];
+
+ results.add(new Record(name, age));
+ }
+ recordReader.close();
+ }
+
+ return results;
+ }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
index 03d2a69f..2a3e2912 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.format.orc;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.format.FileStatsExtractorTestBase;
+import org.apache.flink.table.store.format.orc.filter.OrcFileStatsExtractor;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
index 73555294..c0e927bc 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
@@ -18,9 +18,10 @@
package org.apache.flink.table.store.format.orc;
-import org.apache.flink.orc.OrcFilters;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.format.orc.filter.OrcFilters;
+import org.apache.flink.table.store.format.orc.filter.OrcPredicateFunctionVisitor;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcReaderFactoryTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcReaderFactoryTest.java
new file mode 100644
index 00000000..41a61684
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcReaderFactoryTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.Utils;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.format.orc.filter.OrcFilters;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link OrcReaderFactory}. */
+class OrcReaderFactoryTest {
+
+ /** Small batch size for test more boundary conditions. */
+ protected static final int BATCH_SIZE = 9;
+
+ private static final RowType FLAT_FILE_TYPE =
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.INT().getLogicalType()
+ },
+ new String[] {
+ "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7",
+ "_col8"
+ });
+
+ private static final RowType DECIMAL_FILE_TYPE =
+ RowType.of(new LogicalType[] {new DecimalType(10, 5)}, new String[] {"_col0"});
+
+ private static Path flatFile;
+ private static Path decimalFile;
+
+ @BeforeAll
+ static void setupFiles(@TempDir java.nio.file.Path tmpDir) {
+ flatFile = copyFileFromResource("test-data-flat.orc", tmpDir.resolve("test-data-flat.orc"));
+ decimalFile =
+ copyFileFromResource(
+ "test-data-decimal.orc", tmpDir.resolve("test-data-decimal.orc"));
+ }
+
+ @Test
+ void testReadFileInSplits() throws IOException {
+ OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {0, 1});
+
+ AtomicInteger cnt = new AtomicInteger(0);
+ AtomicLong totalF0 = new AtomicLong(0);
+
+ // read all splits
+ for (FileSourceSplit split : createSplits(flatFile, 4)) {
+ forEach(
+ format,
+ split,
+ row -> {
+ assertThat(row.isNullAt(0)).isFalse();
+ assertThat(row.isNullAt(1)).isFalse();
+ totalF0.addAndGet(row.getInt(0));
+ assertThat(row.getString(1).toString()).isNotNull();
+ cnt.incrementAndGet();
+ });
+ }
+
+ // check that all rows have been read
+ assertThat(cnt.get()).isEqualTo(1920800);
+ assertThat(totalF0.get()).isEqualTo(1844737280400L);
+ }
+
+ @Test
+ void testReadFileWithSelectFields() throws IOException {
+ OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1});
+
+ AtomicInteger cnt = new AtomicInteger(0);
+ AtomicLong totalF0 = new AtomicLong(0);
+
+ // read all splits
+ for (FileSourceSplit split : createSplits(flatFile, 4)) {
+ forEach(
+ format,
+ split,
+ row -> {
+ assertThat(row.isNullAt(0)).isFalse();
+ assertThat(row.isNullAt(1)).isFalse();
+ assertThat(row.isNullAt(2)).isFalse();
+ assertThat(row.getString(0).toString()).isNotNull();
+ totalF0.addAndGet(row.getInt(1));
+ assertThat(row.getString(2).toString()).isNotNull();
+ cnt.incrementAndGet();
+ });
+ }
+
+ // check that all rows have been read
+ assertThat(cnt.get()).isEqualTo(1920800);
+ assertThat(totalF0.get()).isEqualTo(1844737280400L);
+ }
+
+ @Test
+ void testReadDecimalTypeFile() throws IOException {
+ OrcReaderFactory format = createFormat(DECIMAL_FILE_TYPE, new int[] {0});
+
+ AtomicInteger cnt = new AtomicInteger(0);
+ AtomicInteger nullCount = new AtomicInteger(0);
+
+ // read all splits
+ for (FileSourceSplit split : createSplits(decimalFile, 4)) {
+ forEach(
+ format,
+ split,
+ row -> {
+ if (cnt.get() == 0) {
+ // validate first row
+ assertThat(row).isNotNull();
+ assertThat(row.getArity()).isEqualTo(1);
+ assertThat(row.getDecimal(0, 10, 5))
+ .isEqualTo(DecimalDataUtils.castFrom(-1000.5d, 10, 5));
+ } else {
+ if (!row.isNullAt(0)) {
+ assertThat(row.getDecimal(0, 10, 5)).isNotNull();
+ } else {
+ nullCount.incrementAndGet();
+ }
+ }
+ cnt.incrementAndGet();
+ });
+ }
+
+ assertThat(cnt.get()).isEqualTo(6000);
+ assertThat(nullCount.get()).isEqualTo(2000);
+ }
+
+ @Test
+ void testReadFileAndRestore() throws IOException {
+ OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {0, 1});
+
+ // pick a middle split
+ FileSourceSplit split = createSplits(flatFile, 3).get(1);
+
+ int expectedCnt = 660000;
+
+ innerTestRestore(format, split, expectedCnt, 656700330000L);
+ }
+
+ @Test
+ void testReadFileAndRestoreWithFilter() throws IOException {
+ List<OrcFilters.Predicate> filter =
+ Collections.singletonList(
+ new OrcFilters.Or(
+ new OrcFilters.Between(
+ "_col0", PredicateLeaf.Type.LONG, 0L, 975000L),
+ new OrcFilters.Equals("_col0", PredicateLeaf.Type.LONG, 980001L),
+ new OrcFilters.Between(
+ "_col0", PredicateLeaf.Type.LONG, 990000L, 1800000L)));
+ OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {0, 1}, filter);
+
+ // pick a middle split
+ FileSourceSplit split = createSplits(flatFile, 1).get(0);
+
+ int expectedCnt = 1795000;
+ long expectedTotalF0 = 1615113397500L;
+
+ innerTestRestore(format, split, expectedCnt, expectedTotalF0);
+ }
+
+ private void innerTestRestore(
+ OrcReaderFactory format, FileSourceSplit split, int expectedCnt, long expectedTotalF0)
+ throws IOException {
+ AtomicInteger cnt = new AtomicInteger(0);
+ AtomicLong totalF0 = new AtomicLong(0);
+
+ Consumer<RowData> consumer =
+ row -> {
+ assertThat(row.isNullAt(0)).isFalse();
+ assertThat(row.isNullAt(1)).isFalse();
+ totalF0.addAndGet(row.getInt(0));
+ assertThat(row.getString(1).toString()).isNotNull();
+ cnt.incrementAndGet();
+ };
+
+ Utils.forEachRemaining(createReader(format, split), consumer);
+
+ // check that all rows have been read
+ assertThat(cnt.get()).isEqualTo(expectedCnt);
+ assertThat(totalF0.get()).isEqualTo(expectedTotalF0);
+ }
+
+ protected OrcReaderFactory createFormat(RowType formatType, int[] selectedFields) {
+ return createFormat(formatType, selectedFields, new ArrayList<>());
+ }
+
+ protected OrcReaderFactory createFormat(
+ RowType formatType,
+ int[] selectedFields,
+ List<OrcFilters.Predicate> conjunctPredicates) {
+ return new OrcReaderFactory(
+ new Configuration(), formatType, selectedFields, conjunctPredicates, BATCH_SIZE);
+ }
+
+ private BulkFormat.Reader<RowData> createReader(OrcReaderFactory format, FileSourceSplit split)
+ throws IOException {
+ return format.createReader(new org.apache.flink.configuration.Configuration(), split);
+ }
+
+ private void forEach(OrcReaderFactory format, FileSourceSplit split, Consumer<RowData> action)
+ throws IOException {
+ Utils.forEachRemaining(createReader(format, split), action);
+ }
+
+ static Path copyFileFromResource(String resourceName, java.nio.file.Path file) {
+ try (InputStream resource =
+ OrcReaderFactoryTest.class
+ .getClassLoader()
+ .getResource(resourceName)
+ .openStream()) {
+ Files.createDirectories(file.getParent());
+ Files.copy(resource, file);
+ return new Path(file.toString());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static List<FileSourceSplit> createSplits(Path path, int minNumSplits)
+ throws IOException {
+ final List<FileSourceSplit> splits = new ArrayList<>(minNumSplits);
+ final FileStatus fileStatus = path.getFileSystem().getFileStatus(path);
+ final long len = fileStatus.getLen();
+ final long preferSplitSize = len / minNumSplits + (len % minNumSplits == 0 ? 0 : 1);
+ int splitNum = 0;
+ long position = 0;
+ while (position < len) {
+ long splitLen = Math.min(preferSplitSize, len - position);
+ splits.add(
+ new FileSourceSplit(
+ String.valueOf(splitNum++),
+ path,
+ position,
+ splitLen,
+ fileStatus.getModificationTime(),
+ len));
+ position += splitLen;
+ }
+ return splits;
+ }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcSplitReaderUtilTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcSplitReaderUtilTest.java
new file mode 100644
index 00000000..af8cd66a
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcSplitReaderUtilTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.store.format.orc.reader.OrcSplitReaderUtil;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.table.store.format.orc.reader.OrcSplitReaderUtil.logicalTypeToOrcType;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link OrcSplitReaderUtil}. */
+class OrcSplitReaderUtilTest {
+
+ @Test
+ void testLogicalTypeToOrcType() {
+ test("boolean", DataTypes.BOOLEAN());
+ test("char(123)", DataTypes.CHAR(123));
+ test("varchar(123)", DataTypes.VARCHAR(123));
+ test("string", DataTypes.STRING());
+ test("binary", DataTypes.BYTES());
+ test("tinyint", DataTypes.TINYINT());
+ test("smallint", DataTypes.SMALLINT());
+ test("int", DataTypes.INT());
+ test("bigint", DataTypes.BIGINT());
+ test("float", DataTypes.FLOAT());
+ test("double", DataTypes.DOUBLE());
+ test("date", DataTypes.DATE());
+ test("timestamp", DataTypes.TIMESTAMP());
+ test("array<float>", DataTypes.ARRAY(DataTypes.FLOAT()));
+ test("map<float,bigint>", DataTypes.MAP(DataTypes.FLOAT(), DataTypes.BIGINT()));
+ test(
+ "struct<int0:int,str1:string,double2:double,row3:struct<int0:int,int1:int>>",
+ DataTypes.ROW(
+ DataTypes.FIELD("int0", DataTypes.INT()),
+ DataTypes.FIELD("str1", DataTypes.STRING()),
+ DataTypes.FIELD("double2", DataTypes.DOUBLE()),
+ DataTypes.FIELD(
+ "row3",
+ DataTypes.ROW(
+ DataTypes.FIELD("int0", DataTypes.INT()),
+ DataTypes.FIELD("int1", DataTypes.INT())))));
+ test("decimal(4,2)", DataTypes.DECIMAL(4, 2));
+ }
+
+ private void test(String expected, DataType type) {
+ assertThat(logicalTypeToOrcType(type.getLogicalType())).hasToString(expected);
+ }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcWriterFactoryTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcWriterFactoryTest.java
new file mode 100644
index 00000000..e222e828
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcWriterFactoryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
+import org.apache.flink.table.store.format.orc.writer.Vectorizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.MemoryManager;
+import org.apache.orc.OrcFile;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests the behavior of {@link OrcWriterFactory}. */
+class OrcWriterFactoryTest {
+
+ @Test
+ void testNotOverrideInMemoryManager(@TempDir java.nio.file.Path tmpDir) throws IOException {
+ TestMemoryManager memoryManager = new TestMemoryManager();
+ OrcWriterFactory<Record> factory =
+ new TestOrcWriterFactory<>(
+ new RecordVectorizer("struct<_col0:string,_col1:int>"), memoryManager);
+ factory.create(new LocalDataOutputStream(tmpDir.resolve("file1").toFile()));
+ factory.create(new LocalDataOutputStream(tmpDir.resolve("file2").toFile()));
+
+ List<Path> addedWriterPath = memoryManager.getAddedWriterPath();
+ assertThat(addedWriterPath).hasSize(2);
+ assertThat(addedWriterPath.get(1)).isNotEqualTo(addedWriterPath.get(0));
+ }
+
+ private static class TestOrcWriterFactory<T> extends OrcWriterFactory<T> {
+
+ private final MemoryManager memoryManager;
+
+ public TestOrcWriterFactory(Vectorizer<T> vectorizer, MemoryManager memoryManager) {
+ super(vectorizer);
+ this.memoryManager = checkNotNull(memoryManager);
+ }
+
+ @Override
+ protected OrcFile.WriterOptions getWriterOptions() {
+ OrcFile.WriterOptions options = super.getWriterOptions();
+ options.memory(memoryManager);
+ return options;
+ }
+ }
+
+ private static class TestMemoryManager implements MemoryManager {
+ private final List<Path> addedWriterPath = new ArrayList<>();
+
+ @Override
+ public void addWriter(Path path, long requestedAllocation, Callback callback) {
+ addedWriterPath.add(path);
+ }
+
+ public List<Path> getAddedWriterPath() {
+ return addedWriterPath;
+ }
+
+ @Override
+ public void removeWriter(Path path) {}
+
+ @Override
+ public void addedRow(int rows) {}
+ }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/Record.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/Record.java
new file mode 100644
index 00000000..6f1e7633
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/Record.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import java.io.Serializable;
+
+/** A sample type used for the integration test case. */
+public class Record implements Serializable {
+ private final String name;
+ private final int age;
+
+ public Record(String name, int age) {
+ this.name = name;
+ this.age = age;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public int getAge() {
+ return this.age;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (!(other instanceof Record)) {
+ return false;
+ }
+
+ Record otherRecord = (Record) other;
+
+ return this.name.equals(otherRecord.getName()) && this.age == otherRecord.getAge();
+ }
+}
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/RecordVectorizer.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/RecordVectorizer.java
new file mode 100644
index 00000000..d2c0485d
--- /dev/null
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/RecordVectorizer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.table.store.format.orc.writer.Vectorizer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A Vectorizer implementation used for tests.
+ *
+ * <p>It transforms an input element which is of type {@link Record} to a VectorizedRowBatch.
+ */
+public class RecordVectorizer extends Vectorizer<Record> implements Serializable {
+
+ public RecordVectorizer(String schema) {
+ super(schema);
+ }
+
+ @Override
+ public void vectorize(Record element, VectorizedRowBatch batch) throws IOException {
+ BytesColumnVector stringVector = (BytesColumnVector) batch.cols[0];
+ LongColumnVector intColVector = (LongColumnVector) batch.cols[1];
+
+ int row = batch.size++;
+
+ stringVector.setVal(row, element.getName().getBytes(StandardCharsets.UTF_8));
+ intColVector.vector[row] = element.getAge();
+
+ this.addUserMetadata(
+ OrcBulkWriterTestUtil.USER_METADATA_KEY, OrcBulkWriterTestUtil.USER_METADATA_VALUE);
+ }
+}
diff --git a/flink-table-store-format/src/test/resources/test-data-decimal.orc b/flink-table-store-format/src/test/resources/test-data-decimal.orc
new file mode 100644
index 00000000..cb0f7b9d
Binary files /dev/null and b/flink-table-store-format/src/test/resources/test-data-decimal.orc differ
diff --git a/flink-table-store-format/src/test/resources/test-data-flat.orc b/flink-table-store-format/src/test/resources/test-data-flat.orc
new file mode 100644
index 00000000..db0ff15e
Binary files /dev/null and b/flink-table-store-format/src/test/resources/test-data-flat.orc differ
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml b/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml
index 3f9be225..0af1dcad 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml
@@ -201,19 +201,6 @@ under the License.
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<dependency>
<groupId>com.klarna</groupId>
<artifactId>hiverunner</artifactId>
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
index 88af7f2e..6c13899d 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
@@ -22,17 +22,22 @@ import org.apache.flink.core.fs.Path;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
/** ITCase for using S3 in Spark. */
+@RunWith(Parameterized.class)
public class SparkS3ITCase {
@ClassRule public static final MinioTestContainer MINIO_CONTAINER = new MinioTestContainer();
@@ -54,16 +59,29 @@ public class SparkS3ITCase {
spark.sql("USE tablestore.db");
}
- @AfterClass
- public static void afterEach() {
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<String> parameters() {
+ return Arrays.asList("avro", "orc", "parquet");
+ }
+
+ private final String format;
+
+ public SparkS3ITCase(String format) {
+ this.format = format;
+ }
+
+ @After
+ public void afterEach() {
spark.sql("DROP TABLE T");
}
@Test
public void testWriteRead() {
spark.sql(
- "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
- + " ('primary-key'='a', 'bucket'='4', 'file.format'='avro')");
+ String.format(
+ "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
+ + " ('primary-key'='a', 'bucket'='4', 'file.format'='%s')",
+ format));
spark.sql("INSERT INTO T VALUES (1, 2, '3')").collectAsList();
List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
assertThat(rows.toString()).isEqualTo("[[1,2,3]]");
diff --git a/pom.xml b/pom.xml
index 2d51715d..28a13c1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,13 +120,9 @@ under the License.
<flink.table.api.java.bridge>flink-table-api-java-bridge</flink.table.api.java.bridge>
<flink.table.runtime>flink-table-runtime</flink.table.runtime>
<flink.streaming.java>flink-streaming-java</flink.streaming.java>
- <flink.sql.orc>flink-sql-orc</flink.sql.orc>
- <flink.orc>flink-orc</flink.orc>
- <flink.parquet>flink-parquet</flink.parquet>
<flink.connector.kafka>flink-connector-kafka</flink.connector.kafka>
<flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
<flink.test.utils>flink-test-utils</flink.test.utils>
- <flink.sql.parquet>flink-sql-parquet</flink.sql.parquet>
<janino.version>3.0.11</janino.version>
<mockito.version>3.4.6</mockito.version>
</properties>
@@ -315,13 +311,9 @@ under the License.
<flink.table.api.java.bridge>flink-table-api-java-bridge_${scala.binary.version}</flink.table.api.java.bridge>
<flink.table.runtime>flink-table-runtime_${scala.binary.version}</flink.table.runtime>
<flink.streaming.java>flink-streaming-java_${scala.binary.version}</flink.streaming.java>
- <flink.sql.orc>flink-sql-orc_${scala.binary.version}</flink.sql.orc>
- <flink.orc>flink-orc_${scala.binary.version}</flink.orc>
- <flink.parquet>flink-parquet_${scala.binary.version}</flink.parquet>
<flink.connector.kafka>flink-connector-kafka_${scala.binary.version}</flink.connector.kafka>
<flink.sql.connector.kafka>flink-sql-connector-kafka_${scala.binary.version}</flink.sql.connector.kafka>
<flink.test.utils>flink-test-utils_${scala.binary.version}</flink.test.utils>
- <flink.sql.parquet>flink-sql-parquet_${scala.binary.version}</flink.sql.parquet>
</properties>
</profile>