You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/19 09:50:04 UTC
[flink-table-store] branch master updated: [FLINK-28072] Set Hadoop FileSystem for Orc reader
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 6d04cd57 [FLINK-28072] Set Hadoop FileSystem for Orc reader
6d04cd57 is described below
commit 6d04cd57e77aa1d735dac556c2f5abb2f58e1d8f
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jul 19 17:49:59 2022 +0800
[FLINK-28072] Set Hadoop FileSystem for Orc reader
This closes #218
---
.../store/table/AppendOnlyFileStoreTableTest.java | 5 -
.../ChangelogValueCountFileStoreTableTest.java | 5 -
.../table/ChangelogWithKeyFileStoreTableTest.java | 5 -
.../table/store/table/FileStoreTableTestBase.java | 13 ++
.../table/store/table/WritePreemptMemoryTest.java | 5 -
.../store/format/fs/HadoopReadOnlyFileSystem.java | 195 +++++++++++++++++++++
.../store/format/orc/OrcFileStatsExtractor.java | 6 +-
.../store/format/orc/OrcInputFormatFactory.java | 5 +-
.../flink/table/store/format/orc/OrcShimImpl.java | 163 +++++++++++++++++
9 files changed, 374 insertions(+), 28 deletions(-)
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 57cfb1bd..32ef5e7a 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
@@ -34,7 +33,6 @@ import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
import java.util.Arrays;
import java.util.Collections;
@@ -46,8 +44,6 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link AppendOnlyFileStoreTable}. */
public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
- @TempDir java.nio.file.Path tempDir;
-
@Test
public void testBatchReadWrite() throws Exception {
writeData();
@@ -165,7 +161,6 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
@Override
protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
throws Exception {
- Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, tablePath.toString());
conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 9f3604d3..894f5d85 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
@@ -35,7 +34,6 @@ import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
import java.util.Arrays;
import java.util.Collections;
@@ -47,8 +45,6 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ChangelogValueCountFileStoreTable}. */
public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBase {
- @TempDir java.nio.file.Path tempDir;
-
@Test
public void testBatchReadWrite() throws Exception {
writeData();
@@ -168,7 +164,6 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
@Override
protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
throws Exception {
- Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, tablePath.toString());
conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 6a19f043..24bf72e6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
@@ -35,7 +34,6 @@ import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
import java.util.Arrays;
import java.util.Collections;
@@ -47,8 +45,6 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ChangelogWithKeyFileStoreTable}. */
public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
- @TempDir java.nio.file.Path tempDir;
-
@Test
public void testSequenceNumber() throws Exception {
FileStoreTable table =
@@ -257,7 +253,6 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
@Override
protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
throws Exception {
- Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, tablePath.toString());
conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 8b9250bf..a091c12c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -29,6 +30,7 @@ import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.Re
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.Split;
@@ -37,7 +39,9 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
import java.util.Arrays;
@@ -80,6 +84,15 @@ public abstract class FileStoreTableTestBase {
rowData ->
rowData.getRowKind().shortString() + " " + BATCH_ROW_TO_STRING.apply(rowData);
+ @TempDir java.nio.file.Path tempDir;
+
+ protected Path tablePath;
+
+ @BeforeEach
+ public void before() {
+ tablePath = new Path(TestAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+ }
+
@Test
public void testOverwrite() throws Exception {
FileStoreTable table = createFileStoreTable();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 7252a6a6..326de0dc 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.WriteMode;
@@ -34,7 +33,6 @@ import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
import java.util.Arrays;
@@ -48,8 +46,6 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ChangelogWithKeyFileStoreTable}. */
public class WritePreemptMemoryTest extends FileStoreTableTestBase {
- @TempDir java.nio.file.Path tempDir;
-
@Test
public void writeMultiplePartitions() throws Exception {
testWritePreemptMemory(false);
@@ -94,7 +90,6 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
@Override
protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
throws Exception {
- Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, tablePath.toString());
conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/fs/HadoopReadOnlyFileSystem.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/fs/HadoopReadOnlyFileSystem.java
new file mode 100644
index 00000000..6c922bdd
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/fs/HadoopReadOnlyFileSystem.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.fs;
+
+import org.apache.flink.util.IOUtils;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+/**
+ * A read only {@link FileSystem} that wraps an {@link org.apache.flink.core.fs.FileSystem Flink
+ * File System}.
+ */
+public class HadoopReadOnlyFileSystem extends FileSystem {
+
+ private final org.apache.flink.core.fs.FileSystem fs;
+
+ public HadoopReadOnlyFileSystem(org.apache.flink.core.fs.FileSystem fs) {
+ this.fs = fs;
+ }
+
+ @Override
+ public URI getUri() {
+ return fs.getUri();
+ }
+
+ @Override
+ public FSDataInputStream open(Path path) throws IOException {
+ return new FSDataInputStream(new FSDataWrappedInputStream(fs.open(toFlinkPath(path))));
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+ return new FSDataInputStream(
+ new FSDataWrappedInputStream(fs.open(toFlinkPath(path), bufferSize)));
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ return toHadoopStatus(fs.getFileStatus(toFlinkPath(path)));
+ }
+
+ private static org.apache.flink.core.fs.Path toFlinkPath(Path path) {
+ return new org.apache.flink.core.fs.Path(path.toUri());
+ }
+
+ private static Path toHadoopPath(org.apache.flink.core.fs.Path path) {
+ return new Path(path.toUri());
+ }
+
+ private static FileStatus toHadoopStatus(org.apache.flink.core.fs.FileStatus status) {
+ return new FileStatus(
+ status.getLen(),
+ status.isDir(),
+ status.getReplication(),
+ status.getBlockSize(),
+ status.getModificationTime(),
+ status.getAccessTime(),
+ null,
+ null,
+ null,
+ toHadoopPath(status.getPath()));
+ }
+
+ // --------------------- unsupported methods ----------------------------
+
+ @Override
+ public FSDataOutputStream create(
+ Path f,
+ FsPermission permission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * A {@link InputStream} to wrap {@link org.apache.flink.core.fs.FSDataInputStream} for Flink's
+ * input streams.
+ */
+ private static class FSDataWrappedInputStream extends InputStream
+ implements Seekable, PositionedReadable {
+
+ private final org.apache.flink.core.fs.FSDataInputStream fsDataInputStream;
+
+ private FSDataWrappedInputStream(
+ org.apache.flink.core.fs.FSDataInputStream fsDataInputStream) {
+ this.fsDataInputStream = fsDataInputStream;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return fsDataInputStream.read();
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+ fsDataInputStream.seek(position);
+ return fsDataInputStream.read(buffer, offset, length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ fsDataInputStream.seek(position);
+ IOUtils.readFully(fsDataInputStream, buffer, offset, length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ readFully(position, buffer, 0, buffer.length);
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ fsDataInputStream.seek(pos);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return fsDataInputStream.getPos();
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) {
+ return false;
+ }
+ }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
index fa625c53..39fe6d91 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
@@ -36,7 +36,6 @@ import org.apache.orc.DateColumnStatistics;
import org.apache.orc.DecimalColumnStatistics;
import org.apache.orc.DoubleColumnStatistics;
import org.apache.orc.IntegerColumnStatistics;
-import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.StringColumnStatistics;
import org.apache.orc.TimestampColumnStatistics;
@@ -58,10 +57,7 @@ public class OrcFileStatsExtractor implements FileStatsExtractor {
@Override
public FieldStats[] extract(Path path) throws IOException {
- org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
- Reader reader =
- OrcFile.createReader(hadoopPath, OrcFile.readerOptions(new Configuration()));
-
+ Reader reader = OrcShimImpl.createReader(new Configuration(), path);
long rowCount = reader.getNumberOfRows();
ColumnStatistics[] columnStatistics = reader.getStatistics();
TypeDescription schema = reader.getSchema();
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java
index 365b52d7..e5e5e3ec 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.orc.OrcFilters;
-import org.apache.flink.orc.shim.OrcShim;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.utils.ReflectionUtils;
@@ -65,7 +64,7 @@ public class OrcInputFormatFactory {
return ReflectionUtils.invokeStaticMethod(
formatClass,
"createPartitionedFormat",
- OrcShim.defaultShim(),
+ new OrcShimImpl(),
conf,
type,
Collections.emptyList(),
@@ -90,7 +89,7 @@ public class OrcInputFormatFactory {
return ReflectionUtils.invokeStaticMethod(
formatClass,
"createPartitionedFormat",
- OrcShim.defaultShim(),
+ new OrcShimImpl(),
conf,
type,
Collections.emptyList(),
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcShimImpl.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcShimImpl.java
new file mode 100644
index 00000000..69fd951e
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcShimImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.orc.OrcFilters;
+import org.apache.flink.orc.shim.OrcShim;
+import org.apache.flink.orc.vector.HiveOrcBatchWrapper;
+import org.apache.flink.table.store.format.fs.HadoopReadOnlyFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A {@link OrcShim} for table store.
+ *
+ * <p>This is copied from flink-orc except filesystem setting.
+ */
+public class OrcShimImpl implements OrcShim<VectorizedRowBatch> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public RecordReader createRecordReader(
+ Configuration conf,
+ TypeDescription schema,
+ int[] selectedFields,
+ List<OrcFilters.Predicate> conjunctPredicates,
+ org.apache.flink.core.fs.Path path,
+ long splitStart,
+ long splitLength)
+ throws IOException {
+ Reader orcReader = createReader(conf, path);
+
+ // get offset and length for the stripes that start in the split
+ Tuple2<Long, Long> offsetAndLength =
+ getOffsetAndLengthForSplit(splitStart, splitLength, orcReader.getStripes());
+
+ // create ORC row reader configuration
+ Reader.Options options =
+ new Reader.Options()
+ .schema(schema)
+ .range(offsetAndLength.f0, offsetAndLength.f1)
+ .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
+ .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
+ .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
+
+ // configure filters
+ if (!conjunctPredicates.isEmpty()) {
+ SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
+ b = b.startAnd();
+ for (OrcFilters.Predicate predicate : conjunctPredicates) {
+ predicate.add(b);
+ }
+ b = b.end();
+ options.searchArgument(b.build(), new String[] {});
+ }
+
+ // configure selected fields
+ options.include(computeProjectionMask(schema, selectedFields));
+
+ // create ORC row reader
+ RecordReader orcRowsReader = orcReader.rows(options);
+
+ // assign ids
+ schema.getId();
+
+ return orcRowsReader;
+ }
+
+ @Override
+ public HiveOrcBatchWrapper createBatchWrapper(TypeDescription schema, int batchSize) {
+ return new HiveOrcBatchWrapper(schema.createRowBatch(batchSize));
+ }
+
+ @Override
+ public boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch) throws IOException {
+ return reader.nextBatch(rowBatch);
+ }
+
+ private static Tuple2<Long, Long> getOffsetAndLengthForSplit(
+ long splitStart, long splitLength, List<StripeInformation> stripes) {
+ long splitEnd = splitStart + splitLength;
+ long readStart = Long.MAX_VALUE;
+ long readEnd = Long.MIN_VALUE;
+
+ for (StripeInformation s : stripes) {
+ if (splitStart <= s.getOffset() && s.getOffset() < splitEnd) {
+ // stripe starts in split, so it is included
+ readStart = Math.min(readStart, s.getOffset());
+ readEnd = Math.max(readEnd, s.getOffset() + s.getLength());
+ }
+ }
+
+ if (readStart < Long.MAX_VALUE) {
+ // at least one split is included
+ return Tuple2.of(readStart, readEnd - readStart);
+ } else {
+ return Tuple2.of(0L, 0L);
+ }
+ }
+
+ /**
+ * Computes the ORC projection mask of the fields to include from the selected
+ * fields.rowOrcInputFormat.nextRecord(null).
+ *
+ * @return The ORC projection mask.
+ */
+ private static boolean[] computeProjectionMask(TypeDescription schema, int[] selectedFields) {
+ // mask with all fields of the schema
+ boolean[] projectionMask = new boolean[schema.getMaximumId() + 1];
+ // for each selected field
+ for (int inIdx : selectedFields) {
+ // set all nested fields of a selected field to true
+ TypeDescription fieldSchema = schema.getChildren().get(inIdx);
+ for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) {
+ projectionMask[i] = true;
+ }
+ }
+ return projectionMask;
+ }
+
+ public static Reader createReader(Configuration conf, org.apache.flink.core.fs.Path path)
+ throws IOException {
+ // open ORC file and create reader
+ Path hPath = new Path(path.toUri());
+
+ OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
+
+ // configure filesystem from Flink filesystem
+ readerOptions.filesystem(new HadoopReadOnlyFileSystem(path.getFileSystem()));
+
+ return OrcFile.createReader(hPath, readerOptions);
+ }
+}