You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/19 09:50:04 UTC

[flink-table-store] branch master updated: [FLINK-28072] Set Hadoop FileSystem for Orc reader

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d04cd57 [FLINK-28072] Set Hadoop FileSystem for Orc reader
6d04cd57 is described below

commit 6d04cd57e77aa1d735dac556c2f5abb2f58e1d8f
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jul 19 17:49:59 2022 +0800

    [FLINK-28072] Set Hadoop FileSystem for Orc reader
    
    This closes #218
---
 .../store/table/AppendOnlyFileStoreTableTest.java  |   5 -
 .../ChangelogValueCountFileStoreTableTest.java     |   5 -
 .../table/ChangelogWithKeyFileStoreTableTest.java  |   5 -
 .../table/store/table/FileStoreTableTestBase.java  |  13 ++
 .../table/store/table/WritePreemptMemoryTest.java  |   5 -
 .../store/format/fs/HadoopReadOnlyFileSystem.java  | 195 +++++++++++++++++++++
 .../store/format/orc/OrcFileStatsExtractor.java    |   6 +-
 .../store/format/orc/OrcInputFormatFactory.java    |   5 +-
 .../flink/table/store/format/orc/OrcShimImpl.java  | 163 +++++++++++++++++
 9 files changed, 374 insertions(+), 28 deletions(-)

diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 57cfb1bd..32ef5e7a 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
@@ -34,7 +33,6 @@ import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -46,8 +44,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link AppendOnlyFileStoreTable}. */
 public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
 
-    @TempDir java.nio.file.Path tempDir;
-
     @Test
     public void testBatchReadWrite() throws Exception {
         writeData();
@@ -165,7 +161,6 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
     @Override
     protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
             throws Exception {
-        Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, tablePath.toString());
         conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 9f3604d3..894f5d85 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
@@ -35,7 +34,6 @@ import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.types.RowKind;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,8 +45,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link ChangelogValueCountFileStoreTable}. */
 public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBase {
 
-    @TempDir java.nio.file.Path tempDir;
-
     @Test
     public void testBatchReadWrite() throws Exception {
         writeData();
@@ -168,7 +164,6 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
     @Override
     protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
             throws Exception {
-        Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, tablePath.toString());
         conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 6a19f043..24bf72e6 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
@@ -35,7 +34,6 @@ import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.types.RowKind;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,8 +45,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link ChangelogWithKeyFileStoreTable}. */
 public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
 
-    @TempDir java.nio.file.Path tempDir;
-
     @Test
     public void testSequenceNumber() throws Exception {
         FileStoreTable table =
@@ -257,7 +253,6 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
     @Override
     protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
             throws Exception {
-        Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, tablePath.toString());
         conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 8b9250bf..a091c12c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -29,6 +30,7 @@ import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.Re
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.Split;
@@ -37,7 +39,9 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -80,6 +84,15 @@ public abstract class FileStoreTableTestBase {
             rowData ->
                     rowData.getRowKind().shortString() + " " + BATCH_ROW_TO_STRING.apply(rowData);
 
+    @TempDir java.nio.file.Path tempDir;
+
+    protected Path tablePath;
+
+    @BeforeEach
+    public void before() {
+        tablePath = new Path(TestAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+    }
+
     @Test
     public void testOverwrite() throws Exception {
         FileStoreTable table = createFileStoreTable();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 7252a6a6..326de0dc 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
@@ -34,7 +33,6 @@ import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -48,8 +46,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link ChangelogWithKeyFileStoreTable}. */
 public class WritePreemptMemoryTest extends FileStoreTableTestBase {
 
-    @TempDir java.nio.file.Path tempDir;
-
     @Test
     public void writeMultiplePartitions() throws Exception {
         testWritePreemptMemory(false);
@@ -94,7 +90,6 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
     @Override
     protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
             throws Exception {
-        Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, tablePath.toString());
         conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/fs/HadoopReadOnlyFileSystem.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/fs/HadoopReadOnlyFileSystem.java
new file mode 100644
index 00000000..6c922bdd
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/fs/HadoopReadOnlyFileSystem.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.fs;
+
+import org.apache.flink.util.IOUtils;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+/**
+ * A read only {@link FileSystem} that wraps an {@link org.apache.flink.core.fs.FileSystem Flink
+ * File System}.
+ */
+public class HadoopReadOnlyFileSystem extends FileSystem {
+
+    private final org.apache.flink.core.fs.FileSystem fs;
+
+    public HadoopReadOnlyFileSystem(org.apache.flink.core.fs.FileSystem fs) {
+        this.fs = fs;
+    }
+
+    @Override
+    public URI getUri() {
+        return fs.getUri();
+    }
+
+    @Override
+    public FSDataInputStream open(Path path) throws IOException {
+        return new FSDataInputStream(new FSDataWrappedInputStream(fs.open(toFlinkPath(path))));
+    }
+
+    @Override
+    public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+        return new FSDataInputStream(
+                new FSDataWrappedInputStream(fs.open(toFlinkPath(path), bufferSize)));
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+        return toHadoopStatus(fs.getFileStatus(toFlinkPath(path)));
+    }
+
+    private static org.apache.flink.core.fs.Path toFlinkPath(Path path) {
+        return new org.apache.flink.core.fs.Path(path.toUri());
+    }
+
+    private static Path toHadoopPath(org.apache.flink.core.fs.Path path) {
+        return new Path(path.toUri());
+    }
+
+    private static FileStatus toHadoopStatus(org.apache.flink.core.fs.FileStatus status) {
+        return new FileStatus(
+                status.getLen(),
+                status.isDir(),
+                status.getReplication(),
+                status.getBlockSize(),
+                status.getModificationTime(),
+                status.getAccessTime(),
+                null,
+                null,
+                null,
+                toHadoopPath(status.getPath()));
+    }
+
+    // --------------------- unsupported methods ----------------------------
+
+    @Override
+    public FSDataOutputStream create(
+            Path f,
+            FsPermission permission,
+            boolean overwrite,
+            int bufferSize,
+            short replication,
+            long blockSize,
+            Progressable progress)
+            throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
+            throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean delete(Path path, boolean recursive) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path path) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setWorkingDirectory(Path path) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean mkdirs(Path path, FsPermission fsPermission) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * A {@link InputStream} to wrap {@link org.apache.flink.core.fs.FSDataInputStream} for Flink's
+     * input streams.
+     */
+    private static class FSDataWrappedInputStream extends InputStream
+            implements Seekable, PositionedReadable {
+
+        private final org.apache.flink.core.fs.FSDataInputStream fsDataInputStream;
+
+        private FSDataWrappedInputStream(
+                org.apache.flink.core.fs.FSDataInputStream fsDataInputStream) {
+            this.fsDataInputStream = fsDataInputStream;
+        }
+
+        @Override
+        public int read() throws IOException {
+            return fsDataInputStream.read();
+        }
+
+        @Override
+        public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+            fsDataInputStream.seek(position);
+            return fsDataInputStream.read(buffer, offset, length);
+        }
+
+        @Override
+        public void readFully(long position, byte[] buffer, int offset, int length)
+                throws IOException {
+            fsDataInputStream.seek(position);
+            IOUtils.readFully(fsDataInputStream, buffer, offset, length);
+        }
+
+        @Override
+        public void readFully(long position, byte[] buffer) throws IOException {
+            readFully(position, buffer, 0, buffer.length);
+        }
+
+        @Override
+        public void seek(long pos) throws IOException {
+            fsDataInputStream.seek(pos);
+        }
+
+        @Override
+        public long getPos() throws IOException {
+            return fsDataInputStream.getPos();
+        }
+
+        @Override
+        public boolean seekToNewSource(long targetPos) {
+            return false;
+        }
+    }
+}
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
index fa625c53..39fe6d91 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileStatsExtractor.java
@@ -36,7 +36,6 @@ import org.apache.orc.DateColumnStatistics;
 import org.apache.orc.DecimalColumnStatistics;
 import org.apache.orc.DoubleColumnStatistics;
 import org.apache.orc.IntegerColumnStatistics;
-import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.StringColumnStatistics;
 import org.apache.orc.TimestampColumnStatistics;
@@ -58,10 +57,7 @@ public class OrcFileStatsExtractor implements FileStatsExtractor {
 
     @Override
     public FieldStats[] extract(Path path) throws IOException {
-        org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
-        Reader reader =
-                OrcFile.createReader(hadoopPath, OrcFile.readerOptions(new Configuration()));
-
+        Reader reader = OrcShimImpl.createReader(new Configuration(), path);
         long rowCount = reader.getNumberOfRows();
         ColumnStatistics[] columnStatistics = reader.getStatistics();
         TypeDescription schema = reader.getSchema();
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java
index 365b52d7..e5e5e3ec 100644
--- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcInputFormatFactory.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.orc.OrcFilters;
-import org.apache.flink.orc.shim.OrcShim;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.store.utils.ReflectionUtils;
@@ -65,7 +64,7 @@ public class OrcInputFormatFactory {
             return ReflectionUtils.invokeStaticMethod(
                     formatClass,
                     "createPartitionedFormat",
-                    OrcShim.defaultShim(),
+                    new OrcShimImpl(),
                     conf,
                     type,
                     Collections.emptyList(),
@@ -90,7 +89,7 @@ public class OrcInputFormatFactory {
             return ReflectionUtils.invokeStaticMethod(
                     formatClass,
                     "createPartitionedFormat",
-                    OrcShim.defaultShim(),
+                    new OrcShimImpl(),
                     conf,
                     type,
                     Collections.emptyList(),
diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcShimImpl.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcShimImpl.java
new file mode 100644
index 00000000..69fd951e
--- /dev/null
+++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcShimImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.orc;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.orc.OrcFilters;
+import org.apache.flink.orc.shim.OrcShim;
+import org.apache.flink.orc.vector.HiveOrcBatchWrapper;
+import org.apache.flink.table.store.format.fs.HadoopReadOnlyFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A {@link OrcShim} for table store.
+ *
+ * <p>This is copied from flink-orc except filesystem setting.
+ */
+public class OrcShimImpl implements OrcShim<VectorizedRowBatch> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public RecordReader createRecordReader(
+            Configuration conf,
+            TypeDescription schema,
+            int[] selectedFields,
+            List<OrcFilters.Predicate> conjunctPredicates,
+            org.apache.flink.core.fs.Path path,
+            long splitStart,
+            long splitLength)
+            throws IOException {
+        Reader orcReader = createReader(conf, path);
+
+        // get offset and length for the stripes that start in the split
+        Tuple2<Long, Long> offsetAndLength =
+                getOffsetAndLengthForSplit(splitStart, splitLength, orcReader.getStripes());
+
+        // create ORC row reader configuration
+        Reader.Options options =
+                new Reader.Options()
+                        .schema(schema)
+                        .range(offsetAndLength.f0, offsetAndLength.f1)
+                        .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
+                        .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
+                        .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
+
+        // configure filters
+        if (!conjunctPredicates.isEmpty()) {
+            SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
+            b = b.startAnd();
+            for (OrcFilters.Predicate predicate : conjunctPredicates) {
+                predicate.add(b);
+            }
+            b = b.end();
+            options.searchArgument(b.build(), new String[] {});
+        }
+
+        // configure selected fields
+        options.include(computeProjectionMask(schema, selectedFields));
+
+        // create ORC row reader
+        RecordReader orcRowsReader = orcReader.rows(options);
+
+        // assign ids
+        schema.getId();
+
+        return orcRowsReader;
+    }
+
+    @Override
+    public HiveOrcBatchWrapper createBatchWrapper(TypeDescription schema, int batchSize) {
+        return new HiveOrcBatchWrapper(schema.createRowBatch(batchSize));
+    }
+
+    @Override
+    public boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch) throws IOException {
+        return reader.nextBatch(rowBatch);
+    }
+
+    private static Tuple2<Long, Long> getOffsetAndLengthForSplit(
+            long splitStart, long splitLength, List<StripeInformation> stripes) {
+        long splitEnd = splitStart + splitLength;
+        long readStart = Long.MAX_VALUE;
+        long readEnd = Long.MIN_VALUE;
+
+        for (StripeInformation s : stripes) {
+            if (splitStart <= s.getOffset() && s.getOffset() < splitEnd) {
+                // stripe starts in split, so it is included
+                readStart = Math.min(readStart, s.getOffset());
+                readEnd = Math.max(readEnd, s.getOffset() + s.getLength());
+            }
+        }
+
+        if (readStart < Long.MAX_VALUE) {
+            // at least one split is included
+            return Tuple2.of(readStart, readEnd - readStart);
+        } else {
+            return Tuple2.of(0L, 0L);
+        }
+    }
+
+    /**
+     * Computes the ORC projection mask of the fields to include from the selected
+     * fields.rowOrcInputFormat.nextRecord(null).
+     *
+     * @return The ORC projection mask.
+     */
+    private static boolean[] computeProjectionMask(TypeDescription schema, int[] selectedFields) {
+        // mask with all fields of the schema
+        boolean[] projectionMask = new boolean[schema.getMaximumId() + 1];
+        // for each selected field
+        for (int inIdx : selectedFields) {
+            // set all nested fields of a selected field to true
+            TypeDescription fieldSchema = schema.getChildren().get(inIdx);
+            for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) {
+                projectionMask[i] = true;
+            }
+        }
+        return projectionMask;
+    }
+
+    public static Reader createReader(Configuration conf, org.apache.flink.core.fs.Path path)
+            throws IOException {
+        // open ORC file and create reader
+        Path hPath = new Path(path.toUri());
+
+        OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
+
+        // configure filesystem from Flink filesystem
+        readerOptions.filesystem(new HadoopReadOnlyFileSystem(path.getFileSystem()));
+
+        return OrcFile.createReader(hPath, readerOptions);
+    }
+}