You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/22 22:10:46 UTC
[9/9] flink git commit: [FLINK-2170] [connectors] Add
OrcRowInputFormat and OrcTableSource.
[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.
This closes #5043.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/200612ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/200612ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/200612ee
Branch: refs/heads/master
Commit: 200612ee0eaa42fdba141be138de172f86798f54
Parents: edbf8c9
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Nov 13 14:54:54 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 22 23:11:30 2017 +0100
----------------------------------------------------------------------
docs/dev/table/sourceSinks.md | 49 +
flink-connectors/flink-orc/pom.xml | 89 +-
.../org/apache/flink/orc/OrcRowInputFormat.java | 745 ++
.../org/apache/flink/orc/OrcTableSource.java | 455 +-
.../java/org/apache/flink/orc/OrcUtils.java | 2379 ++--
.../org/apache/flink/orc/RowOrcInputFormat.java | 241 -
.../apache/flink/orc/OrcRowInputFormatTest.java | 795 ++
.../apache/flink/orc/OrcTableSourceITCase.java | 134 +-
.../apache/flink/orc/OrcTableSourceTest.java | 266 +-
.../java/org/apache/flink/orc/OrcUtilsTest.java | 148 +
.../apache/flink/orc/RowOrcInputFormatTest.java | 472 -
.../test/resources/TestOrcFile.emptyFile.orc | Bin 523 -> 0 bytes
.../TestOrcFile.listliststructlong.orc | Bin 845 -> 0 bytes
.../src/test/resources/TestOrcFile.listlong.orc | Bin 627 -> 0 bytes
.../test/resources/TestOrcFile.liststring.orc | Bin 1298 -> 0 bytes
.../src/test/resources/TestOrcFile.test1.orc | Bin 1711 -> 0 bytes
.../test/resources/TestOrcFile.testDate1900.dat | 10000 -----------------
.../test/resources/TestOrcFile.testDate1900.orc | Bin 30941 -> 0 bytes
.../flink-orc/src/test/resources/decimal.dat | 6000 ----------
.../flink-orc/src/test/resources/decimal.orc | Bin 16337 -> 0 bytes
.../src/test/resources/demo-11-none.orc | Bin 5147970 -> 0 bytes
.../src/test/resources/test-data-decimal.orc | Bin 0 -> 16337 bytes
.../src/test/resources/test-data-flat.orc | Bin 0 -> 408522 bytes
.../src/test/resources/test-data-nested.orc | Bin 0 -> 1711 bytes
.../src/test/resources/test-data-nestedlist.orc | Bin 0 -> 845 bytes
.../src/test/resources/test-data-timetypes.orc | Bin 0 -> 30941 bytes
.../flink/api/java/typeutils/RowTypeInfo.java | 17 +
.../logical/FlinkLogicalTableSourceScan.scala | 16 +-
.../table/plan/util/RexProgramExtractor.scala | 12 +
29 files changed, 3306 insertions(+), 18512 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 0b4bdbe..7387358 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -47,6 +47,7 @@ A custom `TableSource` can be defined by implementing the `BatchTableSource` or
| `Kafka08AvroTableSource` | `flink-connector-kafka-0.8` | N | Y | A `TableSource` for Avro-encoded Kafka 0.8 topics.
| `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A `TableSource` for flat Json-encoded Kafka 0.8 topics.
| `CsvTableSource` | `flink-table` | Y | Y | A simple `TableSource` for CSV files.
+| `OrcTableSource` | `flink-orc` | Y | N | A `TableSource` for ORC files.
All sources that come with the `flink-table` dependency are directly available for Table API or SQL programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency.
@@ -485,6 +486,54 @@ val csvTableSource = CsvTableSource
{% top %}
+### OrcTableSource
+
+The `OrcTableSource` reads [ORC files](https://orc.apache.org). ORC is a file format for structured data and stores the data in a compressed, columnar representation. ORC is very storage efficient and supports projection and filter push-down.
+
+An `OrcTableSource` is created as shown below:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+// create Hadoop Configuration
+Configuration config = new Configuration();
+
+OrcTableSource orcTableSource = OrcTableSource.builder()
+ // path to ORC file(s)
+ .path("file:///path/to/data")
+ // schema of ORC files
+ .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
+ // Hadoop configuration
+ .withConfiguration(config)
+ // build OrcTableSource
+ .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+// create Hadoop Configuration
+val config = new Configuration()
+
+val orcTableSource = OrcTableSource.builder()
+ // path to ORC file(s)
+ .path("file:///path/to/data")
+ // schema of ORC files
+ .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
+ // Hadoop configuration
+ .withConfiguration(config)
+ // build OrcTableSource
+ .build()
+{% endhighlight %}
+</div>
+</div>
+
+**Note:** The `OrcTableSource` does not support ORC's `Union` type yet.
+
+{% top %}
+
Provided TableSinks
-------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/pom.xml b/flink-connectors/flink-orc/pom.xml
index 1ac7eaa..3ee5e49 100644
--- a/flink-connectors/flink-orc/pom.xml
+++ b/flink-connectors/flink-orc/pom.xml
@@ -25,7 +25,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
- <version>1.4-SNAPSHOT</version>
+ <version>1.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -40,22 +40,39 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table_${scala.binary.version}</artifactId>
+ <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${project.version}</version>
- <scope>compile</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
- <scope>compile</scope>
+ <scope>provided</scope>
+ <!-- Projects depending on this project, won't depend on flink-table. -->
+ <optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
- <version>1.4.0</version>
+ <version>1.4.1</version>
+ <exclusions>
+ <!-- Exclude ORC's Hadoop dependency and pull in Flink's shaded Hadoop. -->
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Replacement for ORC's Hadoop dependency. -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop2</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<!-- test dependencies -->
@@ -88,65 +105,7 @@ under the License.
<scope>test</scope>
<type>test-jar</type>
</dependency>
- </dependencies>
- <build>
-
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <versionRange>[2.4,)</versionRange>
- <goals>
- <goal>single</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-clean-plugin</artifactId>
- <versionRange>[1,)</versionRange>
- <goals>
- <goal>clean</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-maven-plugin</artifactId>
- <versionRange>[1.7.7,)</versionRange>
- <goals>
- <goal>schema</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
new file mode 100644
index 0000000..4353cbc
--- /dev/null
+++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
@@ -0,0 +1,745 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat<Row> implements ResultTypeQueryable<Row> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OrcRowInputFormat.class);
+ // the number of rows read in a batch
+ private static final int DEFAULT_BATCH_SIZE = 1000;
+
+ // the number of fields rows to read in a batch
+ private int batchSize;
+ // the configuration to read with
+ private Configuration conf;
+ // the schema of the ORC files to read
+ private TypeDescription schema;
+
+ // the fields of the ORC schema that the returned Rows are composed of.
+ private int[] selectedFields;
+ // the type information of the Rows returned by this InputFormat.
+ private transient RowTypeInfo rowType;
+
+ // the ORC reader
+ private transient RecordReader orcRowsReader;
+ // the vectorized row data to be read in a batch
+ private transient VectorizedRowBatch rowBatch;
+ // the vector of rows that is read in a batch
+ private transient Row[] rows;
+
+ // the number of rows in the current batch
+ private transient int rowsInBatch;
+ // the index of the next row to return
+ private transient int nextRow;
+
+ private ArrayList<Predicate> conjunctPredicates = new ArrayList<>();
+
+ /**
+ * Creates an OrcRowInputFormat.
+ *
+ * @param path The path to read ORC files from.
+ * @param schemaString The schema of the ORC files as String.
+ * @param orcConfig The configuration to read the ORC files with.
+ */
+ public OrcRowInputFormat(String path, String schemaString, Configuration orcConfig) {
+ this(path, TypeDescription.fromString(schemaString), orcConfig, DEFAULT_BATCH_SIZE);
+ }
+
+ /**
+ * Creates an OrcRowInputFormat.
+ *
+ * @param path The path to read ORC files from.
+ * @param schemaString The schema of the ORC files as String.
+ * @param orcConfig The configuration to read the ORC files with.
+ * @param batchSize The number of Row objects to read in a batch.
+ */
+ public OrcRowInputFormat(String path, String schemaString, Configuration orcConfig, int batchSize) {
+ this(path, TypeDescription.fromString(schemaString), orcConfig, batchSize);
+ }
+
+ /**
+ * Creates an OrcRowInputFormat.
+ *
+ * @param path The path to read ORC files from.
+ * @param orcSchema The schema of the ORC files as ORC TypeDescription.
+ * @param orcConfig The configuration to read the ORC files with.
+ * @param batchSize The number of Row objects to read in a batch.
+ */
+ public OrcRowInputFormat(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) {
+ super(new Path(path));
+
+ // configure OrcRowInputFormat
+ this.schema = orcSchema;
+ this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
+ this.conf = orcConfig;
+ this.batchSize = batchSize;
+
+ // set default selection mask, i.e., all fields.
+ this.selectedFields = new int[this.schema.getChildren().size()];
+ for (int i = 0; i < selectedFields.length; i++) {
+ this.selectedFields[i] = i;
+ }
+ }
+
+ /**
+ * Adds a filter predicate to reduce the number of rows to be returned by the input format.
+ * Multiple conjunctive predicates can be added by calling this method multiple times.
+ *
+ * <p>Note: Predicates can significantly reduce the amount of data that is read.
+ * However, the OrcRowInputFormat does not guarantee that all returned rows qualify the
+ * predicates. Moreover, predicates are only applied if the referenced field is among the
+ * selected fields.
+ *
+ * @param predicate The filter predicate.
+ */
+ public void addPredicate(Predicate predicate) {
+ // validate
+ validatePredicate(predicate);
+ // add predicate
+ this.conjunctPredicates.add(predicate);
+ }
+
+ private void validatePredicate(Predicate pred) {
+ if (pred instanceof ColumnPredicate) {
+ // check column name
+ String colName = ((ColumnPredicate) pred).columnName;
+ if (!this.schema.getFieldNames().contains(colName)) {
+ throw new IllegalArgumentException("Predicate cannot be applied. " +
+ "Column '" + colName + "' does not exist in ORC schema.");
+ }
+ } else if (pred instanceof Not) {
+ validatePredicate(((Not) pred).child());
+ } else if (pred instanceof Or) {
+ for (Predicate p : ((Or) pred).children()) {
+ validatePredicate(p);
+ }
+ }
+ }
+
+ /**
+ * Selects the fields from the ORC schema that are returned by InputFormat.
+ *
+ * @param selectedFields The indices of the fields of the ORC schema that are returned by the InputFormat.
+ */
+ public void selectFields(int... selectedFields) {
+ // set field mapping
+ this.selectedFields = selectedFields;
+ // adapt result type
+ this.rowType = RowTypeInfo.projectFields(this.rowType, selectedFields);
+ }
+
+ /**
+ * Computes the ORC projection mask of the fields to include from the selected fields.rowOrcInputFormat.nextRecord(null).
+ *
+ * @return The ORC projection mask.
+ */
+ private boolean[] computeProjectionMask() {
+ // mask with all fields of the schema
+ boolean[] projectionMask = new boolean[schema.getMaximumId() + 1];
+ // for each selected field
+ for (int inIdx : selectedFields) {
+ // set all nested fields of a selected field to true
+ TypeDescription fieldSchema = schema.getChildren().get(inIdx);
+ for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) {
+ projectionMask[i] = true;
+ }
+ }
+ return projectionMask;
+ }
+
+ @Override
+ public void openInputFormat() throws IOException {
+ super.openInputFormat();
+ // create and initialize the row batch
+ this.rows = new Row[batchSize];
+ for (int i = 0; i < batchSize; i++) {
+ rows[i] = new Row(selectedFields.length);
+ }
+ }
+
+ @Override
+ public void open(FileInputSplit fileSplit) throws IOException {
+
+ LOG.debug("Opening ORC file {}", fileSplit.getPath());
+
+ // open ORC file and create reader
+ org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(fileSplit.getPath().getPath());
+ Reader orcReader = OrcFile.createReader(hPath, OrcFile.readerOptions(conf));
+
+ // get offset and length for the stripes that start in the split
+ Tuple2<Long, Long> offsetAndLength = getOffsetAndLengthForSplit(fileSplit, getStripes(orcReader));
+
+ // create ORC row reader configuration
+ Reader.Options options = getOptions(orcReader)
+ .schema(schema)
+ .range(offsetAndLength.f0, offsetAndLength.f1)
+ .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
+ .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
+ .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
+
+ // configure filters
+ if (!conjunctPredicates.isEmpty()) {
+ SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
+ b = b.startAnd();
+ for (Predicate predicate : conjunctPredicates) {
+ predicate.add(b);
+ }
+ b = b.end();
+ options.searchArgument(b.build(), new String[]{});
+ }
+
+ // configure selected fields
+ options.include(computeProjectionMask());
+
+ // create ORC row reader
+ this.orcRowsReader = orcReader.rows(options);
+
+ // assign ids
+ this.schema.getId();
+ // create row batch
+ this.rowBatch = schema.createRowBatch(batchSize);
+ rowsInBatch = 0;
+ nextRow = 0;
+ }
+
+ @VisibleForTesting
+ Reader.Options getOptions(Reader orcReader) {
+ return orcReader.options();
+ }
+
+ @VisibleForTesting
+ List<StripeInformation> getStripes(Reader orcReader) {
+ return orcReader.getStripes();
+ }
+
+ private Tuple2<Long, Long> getOffsetAndLengthForSplit(FileInputSplit split, List<StripeInformation> stripes) {
+ long splitStart = split.getStart();
+ long splitEnd = splitStart + split.getLength();
+
+ long readStart = Long.MAX_VALUE;
+ long readEnd = Long.MIN_VALUE;
+
+ for (StripeInformation s : stripes) {
+ if (splitStart <= s.getOffset() && s.getOffset() < splitEnd) {
+ // stripe starts in split, so it is included
+ readStart = Math.min(readStart, s.getOffset());
+ readEnd = Math.max(readEnd, s.getOffset() + s.getLength());
+ }
+ }
+
+ if (readStart < Long.MAX_VALUE) {
+ // at least one split is included
+ return Tuple2.of(readStart, readEnd - readStart);
+ } else {
+ return Tuple2.of(0L, 0L);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (orcRowsReader != null) {
+ this.orcRowsReader.close();
+ }
+ this.orcRowsReader = null;
+ }
+
+ @Override
+ public void closeInputFormat() throws IOException {
+ this.rows = null;
+ this.rows = null;
+ this.schema = null;
+ this.rowBatch = null;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return !ensureBatch();
+ }
+
+ /**
+ * Checks if there is at least one row left in the batch to return.
+ * If no more row are available, it reads another batch of rows.
+ *
+ * @return Returns true if there is one more row to return, false otherwise.
+ * @throws IOException throw if an exception happens while reading a batch.
+ */
+ private boolean ensureBatch() throws IOException {
+
+ if (nextRow >= rowsInBatch) {
+ // No more rows available in the Rows array.
+ nextRow = 0;
+ // Try to read the next batch if rows from the ORC file.
+ boolean moreRows = orcRowsReader.nextBatch(rowBatch);
+
+ if (moreRows) {
+ // Load the data into the Rows array.
+ rowsInBatch = fillRows(rows, schema, rowBatch, selectedFields);
+ }
+ return moreRows;
+ }
+ // there is at least one Row left in the Rows array.
+ return true;
+ }
+
+ @Override
+ public Row nextRecord(Row reuse) throws IOException {
+ // return the next row
+ return rows[this.nextRow++];
+ }
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return rowType;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom serialization methods
+ // --------------------------------------------------------------------------------------------
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeInt(batchSize);
+ this.conf.write(out);
+ out.writeUTF(schema.toString());
+
+ out.writeInt(selectedFields.length);
+ for (int f : selectedFields) {
+ out.writeInt(f);
+ }
+
+ out.writeInt(conjunctPredicates.size());
+ for (Predicate p : conjunctPredicates) {
+ out.writeObject(p);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ batchSize = in.readInt();
+ org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
+ configuration.readFields(in);
+
+ if (this.conf == null) {
+ this.conf = configuration;
+ }
+ this.schema = TypeDescription.fromString(in.readUTF());
+
+ this.selectedFields = new int[in.readInt()];
+ for (int i = 0; i < selectedFields.length; i++) {
+ this.selectedFields[i] = in.readInt();
+ }
+
+ this.conjunctPredicates = new ArrayList<>();
+ int numPreds = in.readInt();
+ for (int i = 0; i < numPreds; i++) {
+ conjunctPredicates.add((Predicate) in.readObject());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Classes to define predicates
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * A filter predicate that can be evaluated by the OrcRowInputFormat.
+ */
+ public abstract static class Predicate implements Serializable {
+ protected abstract SearchArgument.Builder add(SearchArgument.Builder builder);
+ }
+
+ abstract static class ColumnPredicate extends Predicate {
+ final String columnName;
+ final PredicateLeaf.Type literalType;
+
+ ColumnPredicate(String columnName, PredicateLeaf.Type literalType) {
+ this.columnName = columnName;
+ this.literalType = literalType;
+ }
+
+ Object castLiteral(Serializable literal) {
+
+ switch (literalType) {
+ case LONG:
+ if (literal instanceof Byte) {
+ return new Long((Byte) literal);
+ } else if (literal instanceof Short) {
+ return new Long((Short) literal);
+ } else if (literal instanceof Integer) {
+ return new Long((Integer) literal);
+ } else if (literal instanceof Long) {
+ return literal;
+ } else {
+ throw new IllegalArgumentException("A predicate on a LONG column requires an integer " +
+ "literal, i.e., Byte, Short, Integer, or Long.");
+ }
+ case FLOAT:
+ if (literal instanceof Float) {
+ return new Double((Float) literal);
+ } else if (literal instanceof Double) {
+ return literal;
+ } else if (literal instanceof BigDecimal) {
+ return ((BigDecimal) literal).doubleValue();
+ } else {
+ throw new IllegalArgumentException("A predicate on a FLOAT column requires a floating " +
+ "literal, i.e., Float or Double.");
+ }
+ case STRING:
+ if (literal instanceof String) {
+ return literal;
+ } else {
+ throw new IllegalArgumentException("A predicate on a STRING column requires a floating " +
+ "literal, i.e., Float or Double.");
+ }
+ case BOOLEAN:
+ if (literal instanceof Boolean) {
+ return literal;
+ } else {
+ throw new IllegalArgumentException("A predicate on a BOOLEAN column requires a Boolean literal.");
+ }
+ case DATE:
+ if (literal instanceof Date) {
+ return literal;
+ } else {
+ throw new IllegalArgumentException("A predicate on a DATE column requires a java.sql.Date literal.");
+ }
+ case TIMESTAMP:
+ if (literal instanceof Timestamp) {
+ return literal;
+ } else {
+ throw new IllegalArgumentException("A predicate on a TIMESTAMP column requires a java.sql.Timestamp literal.");
+ }
+ case DECIMAL:
+ if (literal instanceof BigDecimal) {
+ return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) literal));
+ } else {
+ throw new IllegalArgumentException("A predicate on a DECIMAL column requires a BigDecimal literal.");
+ }
+ default:
+ throw new IllegalArgumentException("Unknown literal type " + literalType);
+ }
+ }
+ }
+
+ abstract static class BinaryPredicate extends ColumnPredicate {
+ final Serializable literal;
+
+ BinaryPredicate(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+ super(columnName, literalType);
+ this.literal = literal;
+ }
+ }
+
+ /**
+ * An EQUALS predicate that can be evaluated by the OrcRowInputFormat.
+ */
+ public static class Equals extends BinaryPredicate {
+ /**
+ * Creates an EQUALS predicate.
+ *
+ * @param columnName The column to check.
+ * @param literalType The type of the literal.
+ * @param literal The literal value to check the column against.
+ */
+ public Equals(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+ super(columnName, literalType, literal);
+ }
+
+ @Override
+ protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return builder.equals(columnName, literalType, castLiteral(literal));
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " = " + literal;
+ }
+ }
+
+ /**
+ * An EQUALS predicate that can be evaluated with Null safety by the OrcRowInputFormat.
+ */
+ public static class NullSafeEquals extends BinaryPredicate {
+ /**
+ * Creates a null-safe EQUALS predicate.
+ *
+ * @param columnName The column to check.
+ * @param literalType The type of the literal.
+ * @param literal The literal value to check the column against.
+ */
+ public NullSafeEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+ super(columnName, literalType, literal);
+ }
+
+ @Override
+ protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return builder.nullSafeEquals(columnName, literalType, castLiteral(literal));
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " = " + literal;
+ }
+ }
+
+ /**
+ * A LESS_THAN predicate that can be evaluated by the OrcRowInputFormat.
+ */
+ public static class LessThan extends BinaryPredicate {
+ /**
+ * Creates a LESS_THAN predicate.
+ *
+ * @param columnName The column to check.
+ * @param literalType The type of the literal.
+ * @param literal The literal value to check the column against.
+ */
+ public LessThan(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+ super(columnName, literalType, literal);
+ }
+
+ @Override
+ protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return builder.lessThan(columnName, literalType, castLiteral(literal));
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " < " + literal;
+ }
+ }
+
+ /**
+ * A LESS_THAN_EQUALS predicate that can be evaluated by the OrcRowInputFormat.
+ */
+ public static class LessThanEquals extends BinaryPredicate {
+ /**
+ * Creates a LESS_THAN_EQUALS predicate.
+ *
+ * @param columnName The column to check.
+ * @param literalType The type of the literal.
+ * @param literal The literal value to check the column against.
+ */
+ public LessThanEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+ super(columnName, literalType, literal);
+ }
+
+ @Override
+ protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return builder.lessThanEquals(columnName, literalType, castLiteral(literal));
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " <= " + literal;
+ }
+ }
+
+ /**
+ * An IS_NULL predicate that can be evaluated by the OrcRowInputFormat.
+ */
+ public static class IsNull extends ColumnPredicate {
+ /**
+ * Creates an IS_NULL predicate.
+ *
+ * @param columnName The column to check for null.
+ * @param literalType The type of the column to check for null.
+ */
+ public IsNull(String columnName, PredicateLeaf.Type literalType) {
+ super(columnName, literalType);
+ }
+
+ @Override
+ protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return builder.isNull(columnName, literalType);
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " IS NULL";
+ }
+ }
+
+ /**
+ * An BETWEEN predicate that can be evaluated by the OrcRowInputFormat.
+ */
+ public static class Between extends ColumnPredicate {
+ private Serializable lowerBound;
+ private Serializable upperBound;
+
+ /**
+ * Creates an BETWEEN predicate.
+ *
+ * @param columnName The column to check.
+ * @param literalType The type of the literals.
+ * @param lowerBound The literal value of the (inclusive) lower bound to check the column against.
+ * @param upperBound The literal value of the (inclusive) upper bound to check the column against.
+ */
+ public Between(String columnName, PredicateLeaf.Type literalType, Serializable lowerBound, Serializable upperBound) {
+ super(columnName, literalType);
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ }
+
+ @Override
+ protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return builder.between(columnName, literalType, castLiteral(lowerBound), castLiteral(upperBound));
+ }
+
+ @Override
+ public String toString() {
+ return lowerBound + " <= " + columnName + " <= " + upperBound;
+ }
+ }
+
+ /**
+ * An IN predicate that can be evaluated by the OrcRowInputFormat.
+ */
+ public static class In extends ColumnPredicate {
+ private Serializable[] literals;
+
+ /**
+ * Creates an IN predicate.
+ *
+ * @param columnName The column to check.
+ * @param literalType The type of the literals.
+ * @param literals The literal values to check the column against.
+ */
+ public In(String columnName, PredicateLeaf.Type literalType, Serializable... literals) {
+ super(columnName, literalType);
+ this.literals = literals;
+ }
+
+ @Override
+ protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+ Object[] castedLiterals = new Object[literals.length];
+ for (int i = 0; i < literals.length; i++) {
+ castedLiterals[i] = castLiteral(literals[i]);
+ }
+ return builder.in(columnName, literalType, (Object[]) castedLiterals);
+ }
+
+ @Override
+ public String toString() {
+ return columnName + " IN " + Arrays.toString(literals);
+ }
+ }
+
+ /**
+ * A NOT predicate to negate a predicate that can be evaluated by the OrcRowInputFormat.
+ */
+ public static class Not extends Predicate {
+ private final Predicate pred;
+
+ /**
+ * Creates a NOT predicate.
+ *
+ * @param predicate The predicate to negate.
+ */
+ public Not(Predicate predicate) {
+ this.pred = predicate;
+ }
+
+ protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+ return pred.add(builder.startNot()).end();
+ }
+
+ protected Predicate child() {
+ return pred;
+ }
+
+ @Override
+ public String toString() {
+ return "NOT(" + pred.toString() + ")";
+ }
+ }
+
+ /**
+ * An OR predicate that can be evaluated by the OrcRowInputFormat.
+ */
+ public static class Or extends Predicate {
+ private final Predicate[] preds;
+
+ /**
+ * Creates an OR predicate.
+ *
+ * @param predicates The disjunctive predicates.
+ */
+ public Or(Predicate... predicates) {
+ this.preds = predicates;
+ }
+
+ @Override
+ protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+ SearchArgument.Builder withOr = builder.startOr();
+ for (Predicate p : preds) {
+ withOr = p.add(withOr);
+ }
+ return withOr.end();
+ }
+
+ protected Iterable<Predicate> children() {
+ return Arrays.asList(preds);
+ }
+
+ @Override
+ public String toString() {
+ return "OR(" + Arrays.toString(preds) + ")";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
index 0454ba4..b7c5378 100644
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
+++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
@@ -18,111 +18,474 @@
package org.apache.flink.orc;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.orc.OrcRowInputFormat.Predicate;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Attribute;
+import org.apache.flink.table.expressions.BinaryComparison;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.expressions.IsNotNull;
+import org.apache.flink.table.expressions.IsNull;
+import org.apache.flink.table.expressions.LessThan;
+import org.apache.flink.table.expressions.LessThanOrEqual;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.Not;
+import org.apache.flink.table.expressions.NotEqualTo;
+import org.apache.flink.table.expressions.Or;
+import org.apache.flink.table.expressions.UnaryExpression;
import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.FilterableTableSource;
import org.apache.flink.table.sources.ProjectableTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.orc.TypeDescription;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
/**
- * Creates a TableSource to read ORC file.
+ * A TableSource to read ORC files.
*
- * <p>The ORC file path and schema is passed during {@link OrcTableSource} construction. configuration is optional.
+ * <p>The {@link OrcTableSource} supports projection and filter push-down.</p>
*
- * <p>The OrcTableSource is used as shown in the example below.
+ * <p>An {@link OrcTableSource} is used as shown in the example below.
*
* <pre>
* {@code
- * String path = testInputURL.getPath();
- * String schema = "struct<col1:boolean,col2:tinyint,col3:smallint,col4:int>"
- * OrcTableSource orcSrc = new OrcTableSource(path, schema);
+ * OrcTableSource orcSrc = OrcTableSource.builder()
+ * .path("file:///my/data/file.orc")
+ * .forOrcSchema("struct<col1:boolean,col2:tinyint,col3:smallint,col4:int>")
+ * .build();
+ *
* tEnv.registerTableSource("orcTable", orcSrc);
* Table res = tableEnv.sql("SELECT * FROM orcTable");
* }
* </pre>
*/
-public class OrcTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row> {
+public class OrcTableSource
+ implements BatchTableSource<Row>, ProjectableTableSource<Row>, FilterableTableSource<Row> {
- private String path;
- private TypeDescription orcSchema;
- private RowTypeInfo typeInfo;
- private Configuration orcConfig;
- private int[] fieldMapping;
+ private static final int DEFAULT_BATCH_SIZE = 1000;
- /**
- * The ORC file path and schema.
- *
- * @param path the path of orc file
- * @param orcSchema schema of orc file
- */
- public OrcTableSource(String path, String orcSchema) {
- this(path, orcSchema, new Configuration());
- }
+ // path to read ORC files from
+ private final String path;
+ // schema of the ORC file
+ private final TypeDescription orcSchema;
+ // the schema of the Table
+ private final TableSchema tableSchema;
+ // the configuration to read the file
+ private final Configuration orcConfig;
+ // the number of rows to read in a batch
+ private final int batchSize;
+
+ // type information of the data returned by the InputFormat
+ private final RowTypeInfo typeInfo;
+ // list of selected ORC fields to return
+ private final int[] selectedFields;
+ // list of predicates to apply
+ private final Predicate[] predicates;
/**
- * The file path and schema of orc file, and configuration to read orc file .
+ * Creates an OrcTableSouce from an ORC TypeDescription.
*
- * @param path the path of orc file
- * @param orcSchema schema of orc file
- * @param orcConfig configuration to read orc file
+ * @param path The path to read the ORC files from.
+ * @param orcSchema The schema of the ORC files as TypeDescription.
+ * @param orcConfig The configuration to read the ORC files.
+ * @param batchSize The number of Rows to read in a batch, default is 1000.
*/
- public OrcTableSource(String path, String orcSchema, Configuration orcConfig) {
- this(path, TypeDescription.fromString(orcSchema), orcConfig);
+ private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) {
+ this(path, orcSchema, orcConfig, batchSize, null, null);
}
- public OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig) {
+ private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig,
+ int batchSize, int[] selectedFields, Predicate[] predicates) {
+
+ Preconditions.checkNotNull(path, "Path must not be null.");
+ Preconditions.checkNotNull(orcSchema, "OrcSchema must not be null.");
+ Preconditions.checkNotNull(path, "Configuration must not be null.");
+ Preconditions.checkArgument(batchSize > 0, "Batch size must be larger than null.");
this.path = path;
this.orcSchema = orcSchema;
this.orcConfig = orcConfig;
+ this.batchSize = batchSize;
+ this.selectedFields = selectedFields;
+ this.predicates = predicates;
- this.typeInfo = (RowTypeInfo) OrcUtils.schemaToTypeInfo(this.orcSchema);
+ // determine the type information from the ORC schema
+ RowTypeInfo typeInfoFromSchema = (RowTypeInfo) OrcUtils.schemaToTypeInfo(this.orcSchema);
+
+ // set return type info
+ if (selectedFields == null) {
+ this.typeInfo = typeInfoFromSchema;
+ } else {
+ this.typeInfo = RowTypeInfo.projectFields(typeInfoFromSchema, selectedFields);
+ }
+ // create a TableSchema that corresponds to the ORC schema
+ this.tableSchema = new TableSchema(
+ typeInfoFromSchema.getFieldNames(),
+ typeInfoFromSchema.getFieldTypes()
+ );
}
@Override
public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
-
- RowOrcInputFormat orcIF = new RowOrcInputFormat(path, orcSchema, orcConfig);
- if (fieldMapping != null) {
- orcIF.setFieldMapping(fieldMapping);
+ OrcRowInputFormat orcIF = buildOrcInputFormat();
+ if (selectedFields != null) {
+ orcIF.selectFields(selectedFields);
+ }
+ if (predicates != null) {
+ for (OrcRowInputFormat.Predicate pred : predicates) {
+ orcIF.addPredicate(pred);
+ }
}
return execEnv.createInput(orcIF);
}
+ @VisibleForTesting
+ protected OrcRowInputFormat buildOrcInputFormat() {
+ return new OrcRowInputFormat(path, orcSchema, orcConfig, batchSize);
+ }
+
@Override
public TypeInformation<Row> getReturnType() {
return typeInfo;
}
@Override
- public TableSource<Row> projectFields(int[] fields) {
+ public TableSchema getTableSchema() {
+ return this.tableSchema;
+ }
- OrcTableSource copy = new OrcTableSource(path, orcSchema, orcConfig);
+ @Override
+ public TableSource<Row> projectFields(int[] selectedFields) {
+ // create a copy of the OrcTableSouce with new selected fields
+ return new OrcTableSource(path, orcSchema, orcConfig, batchSize, selectedFields, predicates);
+ }
- // set field mapping
- copy.fieldMapping = fields;
+ @Override
+ public TableSource<Row> applyPredicate(List<Expression> predicates) {
+ ArrayList<Predicate> orcPredicates = new ArrayList<>();
- // adapt TypeInfo
- TypeInformation[] fieldTypes = new TypeInformation[fields.length];
- String[] fieldNames = new String[fields.length];
- for (int i = 0; i < fields.length; i++) {
- fieldTypes[i] = this.typeInfo.getTypeAt(fields[i]);
- fieldNames[i] = this.typeInfo.getFieldNames()[fields[i]];
+ // we do not remove any predicates from the list because ORC does not fully apply predicates
+ for (Expression pred : predicates) {
+ Predicate orcPred = toOrcPredicate(pred);
+ if (orcPred != null) {
+ orcPredicates.add(orcPred);
+ }
}
- copy.typeInfo = new RowTypeInfo(fieldTypes, fieldNames);
- return copy;
+ return new OrcTableSource(path, orcSchema, orcConfig, batchSize, selectedFields, orcPredicates.toArray(new Predicate[]{}));
+ }
+
+ @Override
+ public boolean isFilterPushedDown() {
+ return this.predicates != null;
}
@Override
public String explainSource() {
- return "ORC Source file at path " + this.path + " with schema " + this.orcSchema;
+ return "OrcFile[path=" + path + ", schema=" + orcSchema + ", filter=" + predicateString() + "]";
+ }
+
+ private String predicateString() {
+ if (predicates != null) {
+ return "AND(" + Arrays.toString(predicates) + ")";
+ } else {
+ return "TRUE";
+ }
+ }
+
+ // Predicate conversion for filter push-down.
+
+ private Predicate toOrcPredicate(Expression pred) {
+ if (pred instanceof Or) {
+ Predicate c1 = toOrcPredicate(((Or) pred).left());
+ Predicate c2 = toOrcPredicate(((Or) pred).right());
+ if (c1 == null || c2 == null) {
+ return null;
+ } else {
+ return new OrcRowInputFormat.Or(c1, c2);
+ }
+ } else if (pred instanceof Not) {
+ Predicate c = toOrcPredicate(((Not) pred).child());
+ if (c == null) {
+ return null;
+ } else {
+ return new OrcRowInputFormat.Not(c);
+ }
+ } else if (pred instanceof BinaryComparison) {
+
+ BinaryComparison binComp = (BinaryComparison) pred;
+
+ if (!isValid(binComp)) {
+ // not a valid predicate
+ return null;
+ }
+ PredicateLeaf.Type litType = getLiteralType(binComp);
+ if (litType == null) {
+ // unsupported literal type
+ return null;
+ }
+
+ boolean literalOnRight = literalOnRight(binComp);
+ String colName = getColumnName(binComp);
+ Serializable literal = (Serializable) getLiteral(binComp);
+
+ if (pred instanceof EqualTo) {
+ return new OrcRowInputFormat.Equals(colName, litType, literal);
+ } else if (pred instanceof NotEqualTo) {
+ return new OrcRowInputFormat.Not(
+ new OrcRowInputFormat.Equals(colName, litType, literal));
+ } else if (pred instanceof GreaterThan) {
+ if (literalOnRight) {
+ return new OrcRowInputFormat.Not(
+ new OrcRowInputFormat.LessThanEquals(colName, litType, literal));
+ } else {
+ return new OrcRowInputFormat.LessThan(colName, litType, literal);
+ }
+ } else if (pred instanceof GreaterThanOrEqual) {
+ if (literalOnRight) {
+ return new OrcRowInputFormat.Not(
+ new OrcRowInputFormat.LessThan(colName, litType, literal));
+ } else {
+ return new OrcRowInputFormat.LessThanEquals(colName, litType, literal);
+ }
+ } else if (pred instanceof LessThan) {
+ if (literalOnRight) {
+ return new OrcRowInputFormat.LessThan(colName, litType, literal);
+ } else {
+ return new OrcRowInputFormat.Not(
+ new OrcRowInputFormat.LessThanEquals(colName, litType, literal));
+ }
+ } else if (pred instanceof LessThanOrEqual) {
+ if (literalOnRight) {
+ return new OrcRowInputFormat.LessThanEquals(colName, litType, literal);
+ } else {
+ return new OrcRowInputFormat.Not(
+ new OrcRowInputFormat.LessThan(colName, litType, literal));
+ }
+ } else {
+ // unsupported predicate
+ return null;
+ }
+ } else if (pred instanceof UnaryExpression) {
+
+ UnaryExpression unary = (UnaryExpression) pred;
+ if (!isValid(unary)) {
+ // not a valid predicate
+ return null;
+ }
+ PredicateLeaf.Type colType = toOrcType(((UnaryExpression) pred).child().resultType());
+ if (colType == null) {
+ // unsupported type
+ return null;
+ }
+
+ String colName = getColumnName(unary);
+
+ if (pred instanceof IsNull) {
+ return new OrcRowInputFormat.IsNull(colName, colType);
+ } else if (pred instanceof IsNotNull) {
+ return new OrcRowInputFormat.Not(
+ new OrcRowInputFormat.IsNull(colName, colType));
+ } else {
+ // unsupported predicate
+ return null;
+ }
+ } else {
+ // unsupported predicate
+ return null;
+ }
+ }
+
+ private boolean isValid(UnaryExpression unary) {
+ return unary.child() instanceof Attribute;
+ }
+
+ private boolean isValid(BinaryComparison comp) {
+ return (comp.left() instanceof Literal && comp.right() instanceof Attribute) ||
+ (comp.left() instanceof Attribute && comp.right() instanceof Literal);
+ }
+
+ private boolean literalOnRight(BinaryComparison comp) {
+ if (comp.left() instanceof Literal && comp.right() instanceof Attribute) {
+ return false;
+ } else if (comp.left() instanceof Attribute && comp.right() instanceof Literal) {
+ return true;
+ } else {
+ throw new RuntimeException("Invalid binary comparison.");
+ }
+ }
+
+ private String getColumnName(UnaryExpression unary) {
+ return ((Attribute) unary.child()).name();
+ }
+
+ private String getColumnName(BinaryComparison comp) {
+ if (literalOnRight(comp)) {
+ return ((Attribute) comp.left()).name();
+ } else {
+ return ((Attribute) comp.right()).name();
+ }
+ }
+
+ private PredicateLeaf.Type getLiteralType(BinaryComparison comp) {
+ if (literalOnRight(comp)) {
+ return toOrcType(((Literal) comp.right()).resultType());
+ } else {
+ return toOrcType(((Literal) comp.left()).resultType());
+ }
+ }
+
+ private Object getLiteral(BinaryComparison comp) {
+ if (literalOnRight(comp)) {
+ return ((Literal) comp.right()).value();
+ } else {
+ return ((Literal) comp.left()).value();
+ }
+ }
+
+ private PredicateLeaf.Type toOrcType(TypeInformation<?> type) {
+ if (type == BasicTypeInfo.BYTE_TYPE_INFO ||
+ type == BasicTypeInfo.SHORT_TYPE_INFO ||
+ type == BasicTypeInfo.INT_TYPE_INFO ||
+ type == BasicTypeInfo.LONG_TYPE_INFO) {
+ return PredicateLeaf.Type.LONG;
+ } else if (type == BasicTypeInfo.FLOAT_TYPE_INFO ||
+ type == BasicTypeInfo.DOUBLE_TYPE_INFO) {
+ return PredicateLeaf.Type.FLOAT;
+ } else if (type == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+ return PredicateLeaf.Type.BOOLEAN;
+ } else if (type == BasicTypeInfo.STRING_TYPE_INFO) {
+ return PredicateLeaf.Type.STRING;
+ } else if (type == SqlTimeTypeInfo.TIMESTAMP) {
+ return PredicateLeaf.Type.TIMESTAMP;
+ } else if (type == SqlTimeTypeInfo.DATE) {
+ return PredicateLeaf.Type.DATE;
+ } else if (type == BasicTypeInfo.BIG_DEC_TYPE_INFO) {
+ return PredicateLeaf.Type.DECIMAL;
+ } else {
+ // unsupported type
+ return null;
+ }
+ }
+
+ // Builder
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Constructs an {@link OrcTableSource}.
+ */
+ public static class Builder {
+
+ private String path;
+
+ private TypeDescription schema;
+
+ private Configuration config;
+
+ private int batchSize = 0;
+
+ /**
+ * Sets the path of the ORC file(s).
+ *
+ * @param path The path of the ORC file(s).
+ * @return The builder.
+ */
+ public Builder path(String path) {
+ Preconditions.checkNotNull(path, "Path must not be null.");
+ Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty.");
+ this.path = path;
+ return this;
+ }
+
+ /**
+ * Sets the ORC schema of the files to read as a String.
+ *
+ * @param orcSchema The ORC schema of the files to read as a String.
+ * @return The builder.
+ */
+ public Builder forOrcSchema(String orcSchema) {
+ Preconditions.checkNotNull(orcSchema, "ORC schema must not be null.");
+ this.schema = TypeDescription.fromString(orcSchema);
+ return this;
+ }
+
+ /**
+ * Sets the ORC schema of the files to read as a {@link TypeDescription}.
+ *
+ * @param orcSchema The ORC schema of the files to read as a String.
+ * @return The builder.
+ */
+ public Builder forOrcSchema(TypeDescription orcSchema) {
+ Preconditions.checkNotNull(orcSchema, "ORC Schema must not be null.");
+ this.schema = orcSchema;
+ return this;
+ }
+
+ /**
+ * Sets a Hadoop {@link Configuration} for the ORC reader. If no configuration is configured,
+ * an empty configuration is used.
+ *
+ * @param config The Hadoop Configuration for the ORC reader.
+ * @return The builder.
+ */
+ public Builder withConfiguration(Configuration config) {
+ Preconditions.checkNotNull(config, "Configuration must not be null.");
+ this.config = config;
+ return this;
+ }
+
+ /**
+ * Sets the number of rows that are read in a batch. If not configured, the ORC files are
+ * read with a batch size of 1000.
+ *
+ * @param batchSize The number of rows that are read in a batch.
+ * @return The builder.
+ */
+ public Builder withBatchSize(int batchSize) {
+ Preconditions.checkArgument(batchSize > 0, "Batch size must be greater than zero.");
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ /**
+ * Builds the OrcTableSource for this builder.
+ *
+ * @return The OrcTableSource for this builder.
+ */
+ public OrcTableSource build() {
+ Preconditions.checkNotNull(this.path, "Path must not be null.");
+ Preconditions.checkNotNull(this.schema, "ORC schema must not be null.");
+ if (this.config == null) {
+ this.config = new Configuration();
+ }
+ if (this.batchSize == 0) {
+ // set default batch size
+ this.batchSize = DEFAULT_BATCH_SIZE;
+ }
+ return new OrcTableSource(this.path, this.schema, this.config, this.batchSize);
+ }
+
}
}