You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/22 22:10:46 UTC

[9/9] flink git commit: [FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.

[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.

This closes #5043.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/200612ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/200612ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/200612ee

Branch: refs/heads/master
Commit: 200612ee0eaa42fdba141be138de172f86798f54
Parents: edbf8c9
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Nov 13 14:54:54 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 22 23:11:30 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   |    49 +
 flink-connectors/flink-orc/pom.xml              |    89 +-
 .../org/apache/flink/orc/OrcRowInputFormat.java |   745 ++
 .../org/apache/flink/orc/OrcTableSource.java    |   455 +-
 .../java/org/apache/flink/orc/OrcUtils.java     |  2379 ++--
 .../org/apache/flink/orc/RowOrcInputFormat.java |   241 -
 .../apache/flink/orc/OrcRowInputFormatTest.java |   795 ++
 .../apache/flink/orc/OrcTableSourceITCase.java  |   134 +-
 .../apache/flink/orc/OrcTableSourceTest.java    |   266 +-
 .../java/org/apache/flink/orc/OrcUtilsTest.java |   148 +
 .../apache/flink/orc/RowOrcInputFormatTest.java |   472 -
 .../test/resources/TestOrcFile.emptyFile.orc    |   Bin 523 -> 0 bytes
 .../TestOrcFile.listliststructlong.orc          |   Bin 845 -> 0 bytes
 .../src/test/resources/TestOrcFile.listlong.orc |   Bin 627 -> 0 bytes
 .../test/resources/TestOrcFile.liststring.orc   |   Bin 1298 -> 0 bytes
 .../src/test/resources/TestOrcFile.test1.orc    |   Bin 1711 -> 0 bytes
 .../test/resources/TestOrcFile.testDate1900.dat | 10000 -----------------
 .../test/resources/TestOrcFile.testDate1900.orc |   Bin 30941 -> 0 bytes
 .../flink-orc/src/test/resources/decimal.dat    |  6000 ----------
 .../flink-orc/src/test/resources/decimal.orc    |   Bin 16337 -> 0 bytes
 .../src/test/resources/demo-11-none.orc         |   Bin 5147970 -> 0 bytes
 .../src/test/resources/test-data-decimal.orc    |   Bin 0 -> 16337 bytes
 .../src/test/resources/test-data-flat.orc       |   Bin 0 -> 408522 bytes
 .../src/test/resources/test-data-nested.orc     |   Bin 0 -> 1711 bytes
 .../src/test/resources/test-data-nestedlist.orc |   Bin 0 -> 845 bytes
 .../src/test/resources/test-data-timetypes.orc  |   Bin 0 -> 30941 bytes
 .../flink/api/java/typeutils/RowTypeInfo.java   |    17 +
 .../logical/FlinkLogicalTableSourceScan.scala   |    16 +-
 .../table/plan/util/RexProgramExtractor.scala   |    12 +
 29 files changed, 3306 insertions(+), 18512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 0b4bdbe..7387358 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -47,6 +47,7 @@ A custom `TableSource` can be defined by implementing the `BatchTableSource` or
 | `Kafka08AvroTableSource` | `flink-connector-kafka-0.8` | N | Y | A `TableSource` for Avro-encoded Kafka 0.8 topics.
 | `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A `TableSource` for flat Json-encoded Kafka 0.8 topics.
 | `CsvTableSource` | `flink-table` | Y | Y | A simple `TableSource` for CSV files.
+| `OrcTableSource` | `flink-orc` | Y | N | A `TableSource` for ORC files.
 
 All sources that come with the `flink-table` dependency are directly available for Table API or SQL programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency.
 
@@ -485,6 +486,54 @@ val csvTableSource = CsvTableSource
 
 {% top %}
 
+### OrcTableSource
+
+The `OrcTableSource` reads [ORC files](https://orc.apache.org). ORC is a file format for structured data and stores the data in a compressed, columnar representation. ORC is very storage efficient and supports projection and filter push-down.
+
+An `OrcTableSource` is created as shown below:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+// create Hadoop Configuration
+Configuration config = new Configuration();
+
+OrcTableSource orcTableSource = OrcTableSource.builder()
+  // path to ORC file(s)
+  .path("file:///path/to/data")
+  // schema of ORC files
+  .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
+  // Hadoop configuration
+  .withConfiguration(config)
+  // build OrcTableSource
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+// create Hadoop Configuration
+val config = new Configuration()
+
+val orcTableSource = OrcTableSource.builder()
+  // path to ORC file(s)
+  .path("file:///path/to/data")
+  // schema of ORC files
+  .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
+  // Hadoop configuration
+  .withConfiguration(config)
+  // build OrcTableSource
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+**Note:** The `OrcTableSource` does not support ORC's `Union` type yet.
+
+{% top %}
+
 Provided TableSinks
 -------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/pom.xml b/flink-connectors/flink-orc/pom.xml
index 1ac7eaa..3ee5e49 100644
--- a/flink-connectors/flink-orc/pom.xml
+++ b/flink-connectors/flink-orc/pom.xml
@@ -25,7 +25,7 @@ under the License.
 	<parent>
 		<groupId>org.apache.flink</groupId>
 		<artifactId>flink-connectors</artifactId>
-		<version>1.4-SNAPSHOT</version>
+		<version>1.5-SNAPSHOT</version>
 		<relativePath>..</relativePath>
 	</parent>
 
@@ -40,22 +40,39 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
-			<scope>compile</scope>
+			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
-			<scope>compile</scope>
+			<scope>provided</scope>
+			<!-- Projects depending on this project, won't depend on flink-table. -->
+			<optional>true</optional>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.orc</groupId>
 			<artifactId>orc-core</artifactId>
-			<version>1.4.0</version>
+			<version>1.4.1</version>
+			<exclusions>
+				<!-- Exclude ORC's Hadoop dependency and pull in Flink's shaded Hadoop. -->
+				<exclusion>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- Replacement for ORC's Hadoop dependency. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-hadoop2</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
 		</dependency>
 
 		<!-- test dependencies -->
@@ -88,65 +105,7 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
-	</dependencies>
 
-	<build>
-
-		<pluginManagement>
-			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-assembly-plugin</artifactId>
-										<versionRange>[2.4,)</versionRange>
-										<goals>
-											<goal>single</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-clean-plugin</artifactId>
-										<versionRange>[1,)</versionRange>
-										<goals>
-											<goal>clean</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.avro</groupId>
-										<artifactId>avro-maven-plugin</artifactId>
-										<versionRange>[1.7.7,)</versionRange>
-										<goals>
-											<goal>schema</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-	</build>
+	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
new file mode 100644
index 0000000..4353cbc
--- /dev/null
+++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
@@ -0,0 +1,745 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat<Row> implements ResultTypeQueryable<Row> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(OrcRowInputFormat.class);
+	// the number of rows read in a batch
+	private static final int DEFAULT_BATCH_SIZE = 1000;
+
+	// the number of fields rows to read in a batch
+	private int batchSize;
+	// the configuration to read with
+	private Configuration conf;
+	// the schema of the ORC files to read
+	private TypeDescription schema;
+
+	// the fields of the ORC schema that the returned Rows are composed of.
+	private int[] selectedFields;
+	// the type information of the Rows returned by this InputFormat.
+	private transient RowTypeInfo rowType;
+
+	// the ORC reader
+	private transient RecordReader orcRowsReader;
+	// the vectorized row data to be read in a batch
+	private transient VectorizedRowBatch rowBatch;
+	// the vector of rows that is read in a batch
+	private transient Row[] rows;
+
+	// the number of rows in the current batch
+	private transient int rowsInBatch;
+	// the index of the next row to return
+	private transient int nextRow;
+
+	private ArrayList<Predicate> conjunctPredicates = new ArrayList<>();
+
+	/**
+	 * Creates an OrcRowInputFormat.
+	 *
+	 * @param path The path to read ORC files from.
+	 * @param schemaString The schema of the ORC files as String.
+	 * @param orcConfig The configuration to read the ORC files with.
+	 */
+	public OrcRowInputFormat(String path, String schemaString, Configuration orcConfig) {
+		this(path, TypeDescription.fromString(schemaString), orcConfig, DEFAULT_BATCH_SIZE);
+	}
+
+	/**
+	 * Creates an OrcRowInputFormat.
+	 *
+	 * @param path The path to read ORC files from.
+	 * @param schemaString The schema of the ORC files as String.
+	 * @param orcConfig The configuration to read the ORC files with.
+	 * @param batchSize The number of Row objects to read in a batch.
+	 */
+	public OrcRowInputFormat(String path, String schemaString, Configuration orcConfig, int batchSize) {
+		this(path, TypeDescription.fromString(schemaString), orcConfig, batchSize);
+	}
+
+	/**
+	 * Creates an OrcRowInputFormat.
+	 *
+	 * @param path The path to read ORC files from.
+	 * @param orcSchema The schema of the ORC files as ORC TypeDescription.
+	 * @param orcConfig The configuration to read the ORC files with.
+	 * @param batchSize The number of Row objects to read in a batch.
+	 */
+	public OrcRowInputFormat(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) {
+		super(new Path(path));
+
+		// configure OrcRowInputFormat
+		this.schema = orcSchema;
+		this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
+		this.conf = orcConfig;
+		this.batchSize = batchSize;
+
+		// set default selection mask, i.e., all fields.
+		this.selectedFields = new int[this.schema.getChildren().size()];
+		for (int i = 0; i < selectedFields.length; i++) {
+			this.selectedFields[i] = i;
+		}
+	}
+
+	/**
+	 * Adds a filter predicate to reduce the number of rows to be returned by the input format.
+	 * Multiple conjunctive predicates can be added by calling this method multiple times.
+	 *
+	 * <p>Note: Predicates can significantly reduce the amount of data that is read.
+	 * However, the OrcRowInputFormat does not guarantee that all returned rows qualify the
+	 * predicates. Moreover, predicates are only applied if the referenced field is among the
+	 * selected fields.
+	 *
+	 * @param predicate The filter predicate.
+	 */
+	public void addPredicate(Predicate predicate) {
+		// validate
+		validatePredicate(predicate);
+		// add predicate
+		this.conjunctPredicates.add(predicate);
+	}
+
+	private void validatePredicate(Predicate pred) {
+		if (pred instanceof ColumnPredicate) {
+			// check column name
+			String colName = ((ColumnPredicate) pred).columnName;
+			if (!this.schema.getFieldNames().contains(colName)) {
+				throw new IllegalArgumentException("Predicate cannot be applied. " +
+					"Column '" + colName + "' does not exist in ORC schema.");
+			}
+		} else if (pred instanceof Not) {
+			validatePredicate(((Not) pred).child());
+		} else if (pred instanceof Or) {
+			for (Predicate p : ((Or) pred).children()) {
+				validatePredicate(p);
+			}
+		}
+	}
+
+	/**
+	 * Selects the fields from the ORC schema that are returned by InputFormat.
+	 *
+	 * @param selectedFields The indices of the fields of the ORC schema that are returned by the InputFormat.
+	 */
+	public void selectFields(int... selectedFields) {
+		// set field mapping
+		this.selectedFields = selectedFields;
+		// adapt result type
+		this.rowType = RowTypeInfo.projectFields(this.rowType, selectedFields);
+	}
+
+	/**
+	 * Computes the ORC projection mask of the fields to include from the selected fields.rowOrcInputFormat.nextRecord(null).
+	 *
+	 * @return The ORC projection mask.
+	 */
+	private boolean[] computeProjectionMask() {
+		// mask with all fields of the schema
+		boolean[] projectionMask = new boolean[schema.getMaximumId() + 1];
+		// for each selected field
+		for (int inIdx : selectedFields) {
+			// set all nested fields of a selected field to true
+			TypeDescription fieldSchema = schema.getChildren().get(inIdx);
+			for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) {
+				projectionMask[i] = true;
+			}
+		}
+		return projectionMask;
+	}
+
+	@Override
+	public void openInputFormat() throws IOException {
+		super.openInputFormat();
+		// create and initialize the row batch
+		this.rows = new Row[batchSize];
+		for (int i = 0; i < batchSize; i++) {
+			rows[i] = new Row(selectedFields.length);
+		}
+	}
+
+	@Override
+	public void open(FileInputSplit fileSplit) throws IOException {
+
+		LOG.debug("Opening ORC file {}", fileSplit.getPath());
+
+		// open ORC file and create reader
+		org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(fileSplit.getPath().getPath());
+		Reader orcReader = OrcFile.createReader(hPath, OrcFile.readerOptions(conf));
+
+		// get offset and length for the stripes that start in the split
+		Tuple2<Long, Long> offsetAndLength = getOffsetAndLengthForSplit(fileSplit, getStripes(orcReader));
+
+		// create ORC row reader configuration
+		Reader.Options options = getOptions(orcReader)
+			.schema(schema)
+			.range(offsetAndLength.f0, offsetAndLength.f1)
+			.useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
+			.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
+			.tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
+
+		// configure filters
+		if (!conjunctPredicates.isEmpty()) {
+			SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
+			b = b.startAnd();
+			for (Predicate predicate : conjunctPredicates) {
+				predicate.add(b);
+			}
+			b = b.end();
+			options.searchArgument(b.build(), new String[]{});
+		}
+
+		// configure selected fields
+		options.include(computeProjectionMask());
+
+		// create ORC row reader
+		this.orcRowsReader = orcReader.rows(options);
+
+		// assign ids
+		this.schema.getId();
+		// create row batch
+		this.rowBatch = schema.createRowBatch(batchSize);
+		rowsInBatch = 0;
+		nextRow = 0;
+	}
+
+	@VisibleForTesting
+	Reader.Options getOptions(Reader orcReader) {
+		return orcReader.options();
+	}
+
+	@VisibleForTesting
+	List<StripeInformation> getStripes(Reader orcReader) {
+		return orcReader.getStripes();
+	}
+
+	private Tuple2<Long, Long> getOffsetAndLengthForSplit(FileInputSplit split, List<StripeInformation> stripes) {
+		long splitStart = split.getStart();
+		long splitEnd = splitStart + split.getLength();
+
+		long readStart = Long.MAX_VALUE;
+		long readEnd = Long.MIN_VALUE;
+
+		for (StripeInformation s : stripes) {
+			if (splitStart <= s.getOffset() && s.getOffset() < splitEnd) {
+				// stripe starts in split, so it is included
+				readStart = Math.min(readStart, s.getOffset());
+				readEnd = Math.max(readEnd, s.getOffset() + s.getLength());
+			}
+		}
+
+		if (readStart < Long.MAX_VALUE) {
+			// at least one split is included
+			return Tuple2.of(readStart, readEnd - readStart);
+		} else {
+			return Tuple2.of(0L, 0L);
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (orcRowsReader != null) {
+			this.orcRowsReader.close();
+		}
+		this.orcRowsReader = null;
+	}
+
+	@Override
+	public void closeInputFormat() throws IOException {
+		this.rows = null;
+		this.rows = null;
+		this.schema = null;
+		this.rowBatch = null;
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return !ensureBatch();
+	}
+
+	/**
+	 * Checks if there is at least one row left in the batch to return.
+	 * If no more row are available, it reads another batch of rows.
+	 *
+	 * @return Returns true if there is one more row to return, false otherwise.
+	 * @throws IOException throw if an exception happens while reading a batch.
+	 */
+	private boolean ensureBatch() throws IOException {
+
+		if (nextRow >= rowsInBatch) {
+			// No more rows available in the Rows array.
+			nextRow = 0;
+			// Try to read the next batch if rows from the ORC file.
+			boolean moreRows = orcRowsReader.nextBatch(rowBatch);
+
+			if (moreRows) {
+				// Load the data into the Rows array.
+				rowsInBatch = fillRows(rows, schema, rowBatch, selectedFields);
+			}
+			return moreRows;
+		}
+		// there is at least one Row left in the Rows array.
+		return true;
+	}
+
+	@Override
+	public Row nextRecord(Row reuse) throws IOException {
+		// return the next row
+		return rows[this.nextRow++];
+	}
+
+	@Override
+	public TypeInformation<Row> getProducedType() {
+		return rowType;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom serialization methods
+	// --------------------------------------------------------------------------------------------
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeInt(batchSize);
+		this.conf.write(out);
+		out.writeUTF(schema.toString());
+
+		out.writeInt(selectedFields.length);
+		for (int f : selectedFields) {
+			out.writeInt(f);
+		}
+
+		out.writeInt(conjunctPredicates.size());
+		for (Predicate p : conjunctPredicates) {
+			out.writeObject(p);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		batchSize = in.readInt();
+		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
+		configuration.readFields(in);
+
+		if (this.conf == null) {
+			this.conf = configuration;
+		}
+		this.schema = TypeDescription.fromString(in.readUTF());
+
+		this.selectedFields = new int[in.readInt()];
+		for (int i = 0; i < selectedFields.length; i++) {
+			this.selectedFields[i] = in.readInt();
+		}
+
+		this.conjunctPredicates = new ArrayList<>();
+		int numPreds = in.readInt();
+		for (int i = 0; i < numPreds; i++) {
+			conjunctPredicates.add((Predicate) in.readObject());
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Classes to define predicates
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * A filter predicate that can be evaluated by the OrcRowInputFormat.
+	 */
+	public abstract static class Predicate implements Serializable {
+		protected abstract SearchArgument.Builder add(SearchArgument.Builder builder);
+	}
+
+	abstract static class ColumnPredicate extends Predicate {
+		final String columnName;
+		final PredicateLeaf.Type literalType;
+
+		ColumnPredicate(String columnName, PredicateLeaf.Type literalType) {
+			this.columnName = columnName;
+			this.literalType = literalType;
+		}
+
+		Object castLiteral(Serializable literal) {
+
+			switch (literalType) {
+				case LONG:
+					if (literal instanceof Byte) {
+						return new Long((Byte) literal);
+					} else if (literal instanceof Short) {
+						return new Long((Short) literal);
+					} else if (literal instanceof Integer) {
+						return new Long((Integer) literal);
+					} else if (literal instanceof Long) {
+						return literal;
+					} else {
+						throw new IllegalArgumentException("A predicate on a LONG column requires an integer " +
+							"literal, i.e., Byte, Short, Integer, or Long.");
+					}
+				case FLOAT:
+					if (literal instanceof Float) {
+						return new Double((Float) literal);
+					} else if (literal instanceof Double) {
+						return literal;
+					} else if (literal instanceof BigDecimal) {
+						return ((BigDecimal) literal).doubleValue();
+					} else {
+						throw new IllegalArgumentException("A predicate on a FLOAT column requires a floating " +
+							"literal, i.e., Float or Double.");
+					}
+				case STRING:
+					if (literal instanceof String) {
+						return literal;
+					} else {
+						throw new IllegalArgumentException("A predicate on a STRING column requires a floating " +
+							"literal, i.e., Float or Double.");
+					}
+				case BOOLEAN:
+					if (literal instanceof Boolean) {
+						return literal;
+					} else {
+						throw new IllegalArgumentException("A predicate on a BOOLEAN column requires a Boolean literal.");
+					}
+				case DATE:
+					if (literal instanceof Date) {
+						return literal;
+					} else {
+						throw new IllegalArgumentException("A predicate on a DATE column requires a java.sql.Date literal.");
+					}
+				case TIMESTAMP:
+					if (literal instanceof Timestamp) {
+						return literal;
+					} else {
+						throw new IllegalArgumentException("A predicate on a TIMESTAMP column requires a java.sql.Timestamp literal.");
+					}
+				case DECIMAL:
+					if (literal instanceof BigDecimal) {
+						return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) literal));
+					} else {
+						throw new IllegalArgumentException("A predicate on a DECIMAL column requires a BigDecimal literal.");
+					}
+				default:
+					throw new IllegalArgumentException("Unknown literal type " + literalType);
+			}
+		}
+	}
+
+	abstract static class BinaryPredicate extends ColumnPredicate {
+		final Serializable literal;
+
+		BinaryPredicate(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+			super(columnName, literalType);
+			this.literal = literal;
+		}
+	}
+
+	/**
+	 * An EQUALS predicate that can be evaluated by the OrcRowInputFormat.
+	 */
+	public static class Equals extends BinaryPredicate {
+		/**
+		 * Creates an EQUALS predicate.
+		 *
+		 * @param columnName The column to check.
+		 * @param literalType The type of the literal.
+		 * @param literal The literal value to check the column against.
+		 */
+		public Equals(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+			super(columnName, literalType, literal);
+		}
+
+		@Override
+		protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+			return builder.equals(columnName, literalType, castLiteral(literal));
+		}
+
+		@Override
+		public String toString() {
+			return columnName + " = " + literal;
+		}
+	}
+
+	/**
+	 * An EQUALS predicate that can be evaluated with Null safety by the OrcRowInputFormat.
+	 */
+	public static class NullSafeEquals extends BinaryPredicate {
+		/**
+		 * Creates a null-safe EQUALS predicate.
+		 *
+		 * @param columnName The column to check.
+		 * @param literalType The type of the literal.
+		 * @param literal The literal value to check the column against.
+		 */
+		public NullSafeEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+			super(columnName, literalType, literal);
+		}
+
+		@Override
+		protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+			return builder.nullSafeEquals(columnName, literalType, castLiteral(literal));
+		}
+
+		@Override
+		public String toString() {
+			return columnName + " = " + literal;
+		}
+	}
+
+	/**
+	 * A LESS_THAN predicate that can be evaluated by the OrcRowInputFormat.
+	 */
+	public static class LessThan extends BinaryPredicate {
+		/**
+		 * Creates a LESS_THAN predicate.
+		 *
+		 * @param columnName The column to check.
+		 * @param literalType The type of the literal.
+		 * @param literal The literal value to check the column against.
+		 */
+		public LessThan(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+			super(columnName, literalType, literal);
+		}
+
+		@Override
+		protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+			return builder.lessThan(columnName, literalType, castLiteral(literal));
+		}
+
+		@Override
+		public String toString() {
+			return columnName + " < " + literal;
+		}
+	}
+
+	/**
+	 * A LESS_THAN_EQUALS predicate that can be evaluated by the OrcRowInputFormat.
+	 */
+	public static class LessThanEquals extends BinaryPredicate {
+		/**
+		 * Creates a LESS_THAN_EQUALS predicate.
+		 *
+		 * @param columnName The column to check.
+		 * @param literalType The type of the literal.
+		 * @param literal The literal value to check the column against.
+		 */
+		public LessThanEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal) {
+			super(columnName, literalType, literal);
+		}
+
+		@Override
+		protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+			return builder.lessThanEquals(columnName, literalType, castLiteral(literal));
+		}
+
+		@Override
+		public String toString() {
+			return columnName + " <= " + literal;
+		}
+	}
+
+	/**
+	 * An IS_NULL predicate that can be evaluated by the OrcRowInputFormat.
+	 */
+	public static class IsNull extends ColumnPredicate {
+		/**
+		 * Creates an IS_NULL predicate.
+		 *
+		 * @param columnName The column to check for null.
+		 * @param literalType The type of the column to check for null.
+		 */
+		public IsNull(String columnName, PredicateLeaf.Type literalType) {
+			super(columnName, literalType);
+		}
+
+		@Override
+		protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+			return builder.isNull(columnName, literalType);
+		}
+
+		@Override
+		public String toString() {
+			return columnName + " IS NULL";
+		}
+	}
+
+	/**
+	 * An BETWEEN predicate that can be evaluated by the OrcRowInputFormat.
+	 */
+	public static class Between extends ColumnPredicate {
+		private Serializable lowerBound;
+		private Serializable upperBound;
+
+		/**
+		 * Creates an BETWEEN predicate.
+		 *
+		 * @param columnName The column to check.
+		 * @param literalType The type of the literals.
+		 * @param lowerBound The literal value of the (inclusive) lower bound to check the column against.
+		 * @param upperBound The literal value of the (inclusive) upper bound to check the column against.
+		 */
+		public Between(String columnName, PredicateLeaf.Type literalType, Serializable lowerBound, Serializable upperBound) {
+			super(columnName, literalType);
+			this.lowerBound = lowerBound;
+			this.upperBound = upperBound;
+		}
+
+		@Override
+		protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+			return builder.between(columnName, literalType, castLiteral(lowerBound), castLiteral(upperBound));
+		}
+
+		@Override
+		public String toString() {
+			return lowerBound + " <= " + columnName + " <= " + upperBound;
+		}
+	}
+
+	/**
+	 * An IN predicate that can be evaluated by the OrcRowInputFormat.
+	 */
+	public static class In extends ColumnPredicate {
+		private Serializable[] literals;
+
+		/**
+		 * Creates an IN predicate.
+		 *
+		 * @param columnName The column to check.
+		 * @param literalType The type of the literals.
+		 * @param literals The literal values to check the column against.
+		 */
+		public In(String columnName, PredicateLeaf.Type literalType, Serializable... literals) {
+			super(columnName, literalType);
+			this.literals = literals;
+		}
+
+		@Override
+		protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+			Object[] castedLiterals = new Object[literals.length];
+			for (int i = 0; i < literals.length; i++) {
+				castedLiterals[i] = castLiteral(literals[i]);
+			}
+			return builder.in(columnName, literalType, (Object[]) castedLiterals);
+		}
+
+		@Override
+		public String toString() {
+			return columnName + " IN " + Arrays.toString(literals);
+		}
+	}
+
+	/**
+	 * A NOT predicate to negate a predicate that can be evaluated by the OrcRowInputFormat.
+	 */
+	public static class Not extends Predicate {
+		private final Predicate pred;
+
+		/**
+		 * Creates a NOT predicate.
+		 *
+		 * @param predicate The predicate to negate.
+		 */
+		public Not(Predicate predicate) {
+			this.pred = predicate;
+		}
+
+		protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+			return pred.add(builder.startNot()).end();
+		}
+
+		protected Predicate child() {
+			return pred;
+		}
+
+		@Override
+		public String toString() {
+			return "NOT(" + pred.toString() + ")";
+		}
+	}
+
+	/**
+	 * An OR predicate that can be evaluated by the OrcRowInputFormat.
+	 */
+	public static class Or extends Predicate {
+		private final Predicate[] preds;
+
+		/**
+		 * Creates an OR predicate.
+		 *
+		 * @param predicates The disjunctive predicates.
+		 */
+		public Or(Predicate... predicates) {
+			this.preds = predicates;
+		}
+
+		@Override
+		protected SearchArgument.Builder add(SearchArgument.Builder builder) {
+			SearchArgument.Builder withOr = builder.startOr();
+			for (Predicate p : preds) {
+				withOr = p.add(withOr);
+			}
+			return withOr.end();
+		}
+
+		protected Iterable<Predicate> children() {
+			return Arrays.asList(preds);
+		}
+
+		@Override
+		public String toString() {
+			return "OR(" + Arrays.toString(preds) + ")";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
index 0454ba4..b7c5378 100644
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
+++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
@@ -18,111 +18,474 @@
 
 package org.apache.flink.orc;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.orc.OrcRowInputFormat.Predicate;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Attribute;
+import org.apache.flink.table.expressions.BinaryComparison;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.expressions.IsNotNull;
+import org.apache.flink.table.expressions.IsNull;
+import org.apache.flink.table.expressions.LessThan;
+import org.apache.flink.table.expressions.LessThanOrEqual;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.Not;
+import org.apache.flink.table.expressions.NotEqualTo;
+import org.apache.flink.table.expressions.Or;
+import org.apache.flink.table.expressions.UnaryExpression;
 import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.FilterableTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.orc.TypeDescription;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 /**
- * Creates a TableSource to read ORC file.
+ * A TableSource to read ORC files.
  *
- * <p>The ORC file path and schema is passed during {@link OrcTableSource} construction. configuration is optional.
+ * <p>The {@link OrcTableSource} supports projection and filter push-down.</p>
  *
- * <p>The OrcTableSource is used as shown in the example below.
+ * <p>An {@link OrcTableSource} is used as shown in the example below.
  *
  * <pre>
  * {@code
- * String path = testInputURL.getPath();
- * String schema = "struct<col1:boolean,col2:tinyint,col3:smallint,col4:int>"
- * OrcTableSource orcSrc = new OrcTableSource(path, schema);
+ * OrcTableSource orcSrc = OrcTableSource.builder()
+ *   .path("file:///my/data/file.orc")
+ *   .forOrcSchema("struct<col1:boolean,col2:tinyint,col3:smallint,col4:int>")
+ *   .build();
+ *
  * tEnv.registerTableSource("orcTable", orcSrc);
  * Table res = tableEnv.sql("SELECT * FROM orcTable");
  * }
  * </pre>
  */
-public class OrcTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row> {
+public class OrcTableSource
+	implements BatchTableSource<Row>, ProjectableTableSource<Row>, FilterableTableSource<Row> {
 
-	private String path;
-	private TypeDescription orcSchema;
-	private RowTypeInfo typeInfo;
-	private Configuration orcConfig;
-	private int[] fieldMapping;
+	private static final int DEFAULT_BATCH_SIZE = 1000;
 
-	/**
-	 * The ORC file path and schema.
-	 *
-	 * @param path      the path of orc file
-	 * @param orcSchema schema of orc file
-	 */
-	public OrcTableSource(String path, String orcSchema) {
-		this(path, orcSchema, new Configuration());
-	}
+	// path to read ORC files from
+	private final String path;
+	// schema of the ORC file
+	private final TypeDescription orcSchema;
+	// the schema of the Table
+	private final TableSchema tableSchema;
+	// the configuration to read the file
+	private final Configuration orcConfig;
+	// the number of rows to read in a batch
+	private final int batchSize;
+
+	// type information of the data returned by the InputFormat
+	private final RowTypeInfo typeInfo;
+	// list of selected ORC fields to return
+	private final int[] selectedFields;
+	// list of predicates to apply
+	private final Predicate[] predicates;
 
 	/**
-	 * The file path and schema of orc file, and configuration to read orc file .
+	 * Creates an OrcTableSouce from an ORC TypeDescription.
 	 *
-	 * @param path      the path of orc file
-	 * @param orcSchema schema of orc file
-	 * @param orcConfig configuration to read orc file
+	 * @param path		The path to read the ORC files from.
+	 * @param orcSchema The schema of the ORC files as TypeDescription.
+	 * @param orcConfig The configuration to read the ORC files.
+	 * @param batchSize The number of Rows to read in a batch, default is 1000.
 	 */
-	public OrcTableSource(String path, String orcSchema, Configuration orcConfig) {
-		this(path, TypeDescription.fromString(orcSchema), orcConfig);
+	private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) {
+		this(path, orcSchema, orcConfig, batchSize, null, null);
 	}
 
-	public OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig) {
+	private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig,
+							int batchSize, int[] selectedFields, Predicate[] predicates) {
+
+		Preconditions.checkNotNull(path, "Path must not be null.");
+		Preconditions.checkNotNull(orcSchema, "OrcSchema must not be null.");
+		Preconditions.checkNotNull(path, "Configuration must not be null.");
+		Preconditions.checkArgument(batchSize > 0, "Batch size must be larger than null.");
 		this.path = path;
 		this.orcSchema = orcSchema;
 		this.orcConfig = orcConfig;
+		this.batchSize = batchSize;
+		this.selectedFields = selectedFields;
+		this.predicates = predicates;
 
-		this.typeInfo = (RowTypeInfo) OrcUtils.schemaToTypeInfo(this.orcSchema);
+		// determine the type information from the ORC schema
+		RowTypeInfo typeInfoFromSchema = (RowTypeInfo) OrcUtils.schemaToTypeInfo(this.orcSchema);
+
+		// set return type info
+		if (selectedFields == null) {
+			this.typeInfo = typeInfoFromSchema;
+		} else {
+			this.typeInfo = RowTypeInfo.projectFields(typeInfoFromSchema, selectedFields);
+		}
 
+		// create a TableSchema that corresponds to the ORC schema
+		this.tableSchema = new TableSchema(
+			typeInfoFromSchema.getFieldNames(),
+			typeInfoFromSchema.getFieldTypes()
+		);
 	}
 
 	@Override
 	public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
-
-		RowOrcInputFormat orcIF = new RowOrcInputFormat(path, orcSchema, orcConfig);
-		if (fieldMapping != null) {
-			orcIF.setFieldMapping(fieldMapping);
+		OrcRowInputFormat orcIF = buildOrcInputFormat();
+		if (selectedFields != null) {
+			orcIF.selectFields(selectedFields);
+		}
+		if (predicates != null) {
+			for (OrcRowInputFormat.Predicate pred : predicates) {
+				orcIF.addPredicate(pred);
+			}
 		}
 		return execEnv.createInput(orcIF);
 	}
 
+	@VisibleForTesting
+	protected OrcRowInputFormat buildOrcInputFormat() {
+		return new OrcRowInputFormat(path, orcSchema, orcConfig, batchSize);
+	}
+
 	@Override
 	public TypeInformation<Row> getReturnType() {
 		return typeInfo;
 	}
 
 	@Override
-	public TableSource<Row> projectFields(int[] fields) {
+	public TableSchema getTableSchema() {
+		return this.tableSchema;
+	}
 
-		OrcTableSource copy = new OrcTableSource(path, orcSchema, orcConfig);
+	@Override
+	public TableSource<Row> projectFields(int[] selectedFields) {
+		// create a copy of the OrcTableSouce with new selected fields
+		return new OrcTableSource(path, orcSchema, orcConfig, batchSize, selectedFields, predicates);
+	}
 
-		// set field mapping
-		copy.fieldMapping = fields;
+	@Override
+	public TableSource<Row> applyPredicate(List<Expression> predicates) {
+		ArrayList<Predicate> orcPredicates = new ArrayList<>();
 
-		// adapt TypeInfo
-		TypeInformation[] fieldTypes = new TypeInformation[fields.length];
-		String[] fieldNames = new String[fields.length];
-		for (int i = 0; i < fields.length; i++) {
-			fieldTypes[i] = this.typeInfo.getTypeAt(fields[i]);
-			fieldNames[i] = this.typeInfo.getFieldNames()[fields[i]];
+		// we do not remove any predicates from the list because ORC does not fully apply predicates
+		for (Expression pred : predicates) {
+			Predicate orcPred = toOrcPredicate(pred);
+			if (orcPred != null) {
+				orcPredicates.add(orcPred);
+			}
 		}
-		copy.typeInfo = new RowTypeInfo(fieldTypes, fieldNames);
 
-		return copy;
+		return new OrcTableSource(path, orcSchema, orcConfig, batchSize, selectedFields, orcPredicates.toArray(new Predicate[]{}));
+	}
+
+	@Override
+	public boolean isFilterPushedDown() {
+		return this.predicates != null;
 	}
 
 	@Override
 	public String explainSource() {
-		return "ORC Source file at path " + this.path + " with schema " + this.orcSchema;
+		return "OrcFile[path=" + path + ", schema=" + orcSchema + ", filter=" + predicateString() + "]";
+	}
+
+	private String predicateString() {
+		if (predicates != null) {
+			return "AND(" + Arrays.toString(predicates) + ")";
+		} else {
+			return "TRUE";
+		}
+	}
+
+	// Predicate conversion for filter push-down.
+
+	private Predicate toOrcPredicate(Expression pred) {
+		if (pred instanceof Or) {
+			Predicate c1 = toOrcPredicate(((Or) pred).left());
+			Predicate c2 = toOrcPredicate(((Or) pred).right());
+			if (c1 == null || c2 == null) {
+				return null;
+			} else {
+				return new OrcRowInputFormat.Or(c1, c2);
+			}
+		} else if (pred instanceof Not) {
+			Predicate c = toOrcPredicate(((Not) pred).child());
+			if (c == null) {
+				return null;
+			} else {
+				return new OrcRowInputFormat.Not(c);
+			}
+		} else if (pred instanceof BinaryComparison) {
+
+			BinaryComparison binComp = (BinaryComparison) pred;
+
+			if (!isValid(binComp)) {
+				// not a valid predicate
+				return null;
+			}
+			PredicateLeaf.Type litType = getLiteralType(binComp);
+			if (litType == null) {
+				// unsupported literal type
+				return null;
+			}
+
+			boolean literalOnRight = literalOnRight(binComp);
+			String colName = getColumnName(binComp);
+			Serializable literal = (Serializable) getLiteral(binComp);
+
+			if (pred instanceof EqualTo) {
+				return new OrcRowInputFormat.Equals(colName, litType, literal);
+			} else if (pred instanceof NotEqualTo) {
+				return new OrcRowInputFormat.Not(
+					new OrcRowInputFormat.Equals(colName, litType, literal));
+			} else if (pred instanceof GreaterThan) {
+				if (literalOnRight) {
+					return new OrcRowInputFormat.Not(
+						new OrcRowInputFormat.LessThanEquals(colName, litType, literal));
+				} else {
+					return new OrcRowInputFormat.LessThan(colName, litType, literal);
+				}
+			} else if (pred instanceof GreaterThanOrEqual) {
+				if (literalOnRight) {
+					return new OrcRowInputFormat.Not(
+						new OrcRowInputFormat.LessThan(colName, litType, literal));
+				} else {
+					return new OrcRowInputFormat.LessThanEquals(colName, litType, literal);
+				}
+			} else if (pred instanceof LessThan) {
+				if (literalOnRight) {
+					return new OrcRowInputFormat.LessThan(colName, litType, literal);
+				} else {
+					return new OrcRowInputFormat.Not(
+						new OrcRowInputFormat.LessThanEquals(colName, litType, literal));
+				}
+			} else if (pred instanceof LessThanOrEqual) {
+				if (literalOnRight) {
+					return new OrcRowInputFormat.LessThanEquals(colName, litType, literal);
+				} else {
+					return new OrcRowInputFormat.Not(
+						new OrcRowInputFormat.LessThan(colName, litType, literal));
+				}
+			} else {
+				// unsupported predicate
+				return null;
+			}
+		} else if (pred instanceof UnaryExpression) {
+
+			UnaryExpression unary = (UnaryExpression) pred;
+			if (!isValid(unary)) {
+				// not a valid predicate
+				return null;
+			}
+			PredicateLeaf.Type colType = toOrcType(((UnaryExpression) pred).child().resultType());
+			if (colType == null) {
+				// unsupported type
+				return null;
+			}
+
+			String colName = getColumnName(unary);
+
+			if (pred instanceof IsNull) {
+				return new OrcRowInputFormat.IsNull(colName, colType);
+			} else if (pred instanceof IsNotNull) {
+				return new OrcRowInputFormat.Not(
+					new OrcRowInputFormat.IsNull(colName, colType));
+			} else {
+				// unsupported predicate
+				return null;
+			}
+		} else {
+			// unsupported predicate
+			return null;
+		}
+	}
+
+	private boolean isValid(UnaryExpression unary) {
+		return unary.child() instanceof Attribute;
+	}
+
+	private boolean isValid(BinaryComparison comp) {
+		return (comp.left() instanceof Literal && comp.right() instanceof Attribute) ||
+			(comp.left() instanceof Attribute && comp.right() instanceof Literal);
+	}
+
+	private boolean literalOnRight(BinaryComparison comp) {
+		if (comp.left() instanceof Literal && comp.right() instanceof Attribute) {
+			return false;
+		} else if (comp.left() instanceof Attribute && comp.right() instanceof Literal) {
+			return true;
+		} else {
+			throw new RuntimeException("Invalid binary comparison.");
+		}
+	}
+
+	private String getColumnName(UnaryExpression unary) {
+		return ((Attribute) unary.child()).name();
+	}
+
+	private String getColumnName(BinaryComparison comp) {
+		if (literalOnRight(comp)) {
+			return ((Attribute) comp.left()).name();
+		} else {
+			return ((Attribute) comp.right()).name();
+		}
+	}
+
+	private PredicateLeaf.Type getLiteralType(BinaryComparison comp) {
+		if (literalOnRight(comp)) {
+			return toOrcType(((Literal) comp.right()).resultType());
+		} else {
+			return toOrcType(((Literal) comp.left()).resultType());
+		}
+	}
+
+	private Object getLiteral(BinaryComparison comp) {
+		if (literalOnRight(comp)) {
+			return ((Literal) comp.right()).value();
+		} else {
+			return ((Literal) comp.left()).value();
+		}
+	}
+
+	private PredicateLeaf.Type toOrcType(TypeInformation<?> type) {
+		if (type == BasicTypeInfo.BYTE_TYPE_INFO ||
+			type == BasicTypeInfo.SHORT_TYPE_INFO ||
+			type == BasicTypeInfo.INT_TYPE_INFO ||
+			type == BasicTypeInfo.LONG_TYPE_INFO) {
+			return PredicateLeaf.Type.LONG;
+		} else if (type == BasicTypeInfo.FLOAT_TYPE_INFO ||
+			type == BasicTypeInfo.DOUBLE_TYPE_INFO) {
+			return PredicateLeaf.Type.FLOAT;
+		} else if (type == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+			return PredicateLeaf.Type.BOOLEAN;
+		} else if (type == BasicTypeInfo.STRING_TYPE_INFO) {
+			return PredicateLeaf.Type.STRING;
+		} else if (type == SqlTimeTypeInfo.TIMESTAMP) {
+			return PredicateLeaf.Type.TIMESTAMP;
+		} else if (type == SqlTimeTypeInfo.DATE) {
+			return PredicateLeaf.Type.DATE;
+		} else if (type == BasicTypeInfo.BIG_DEC_TYPE_INFO) {
+			return PredicateLeaf.Type.DECIMAL;
+		} else {
+			// unsupported type
+			return null;
+		}
+	}
+
+	// Builder
+
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	/**
+	 * Constructs an {@link OrcTableSource}.
+	 */
+	public static class Builder {
+
+		private String path;
+
+		private TypeDescription schema;
+
+		private Configuration config;
+
+		private int batchSize = 0;
+
+		/**
+		 * Sets the path of the ORC file(s).
+		 *
+		 * @param path The path of the ORC file(s).
+		 * @return The builder.
+		 */
+		public Builder path(String path) {
+			Preconditions.checkNotNull(path, "Path must not be null.");
+			Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty.");
+			this.path = path;
+			return this;
+		}
+
+		/**
+		 * Sets the ORC schema of the files to read as a String.
+		 *
+		 * @param orcSchema The ORC schema of the files to read as a String.
+		 * @return The builder.
+		 */
+		public Builder forOrcSchema(String orcSchema) {
+			Preconditions.checkNotNull(orcSchema, "ORC schema must not be null.");
+			this.schema = TypeDescription.fromString(orcSchema);
+			return this;
+		}
+
+		/**
+		 * Sets the ORC schema of the files to read as a {@link TypeDescription}.
+		 *
+		 * @param orcSchema The ORC schema of the files to read as a String.
+		 * @return The builder.
+		 */
+		public Builder forOrcSchema(TypeDescription orcSchema) {
+			Preconditions.checkNotNull(orcSchema, "ORC Schema must not be null.");
+			this.schema = orcSchema;
+			return this;
+		}
+
+		/**
+		 * Sets a Hadoop {@link Configuration} for the ORC reader. If no configuration is configured,
+		 * an empty configuration is used.
+		 *
+		 * @param config The Hadoop Configuration for the ORC reader.
+		 * @return The builder.
+		 */
+		public Builder withConfiguration(Configuration config) {
+			Preconditions.checkNotNull(config, "Configuration must not be null.");
+			this.config = config;
+			return this;
+		}
+
+		/**
+		 * Sets the number of rows that are read in a batch. If not configured, the ORC files are
+		 * read with a batch size of 1000.
+		 *
+		 * @param batchSize The number of rows that are read in a batch.
+		 * @return The builder.
+		 */
+		public Builder withBatchSize(int batchSize) {
+			Preconditions.checkArgument(batchSize > 0, "Batch size must be greater than zero.");
+			this.batchSize = batchSize;
+			return this;
+		}
+
+		/**
+		 * Builds the OrcTableSource for this builder.
+		 *
+		 * @return The OrcTableSource for this builder.
+		 */
+		public OrcTableSource build() {
+			Preconditions.checkNotNull(this.path, "Path must not be null.");
+			Preconditions.checkNotNull(this.schema, "ORC schema must not be null.");
+			if (this.config == null) {
+				this.config = new Configuration();
+			}
+			if (this.batchSize == 0) {
+				// set default batch size
+				this.batchSize = DEFAULT_BATCH_SIZE;
+			}
+			return new OrcTableSource(this.path, this.schema, this.config, this.batchSize);
+		}
+
 	}
 
 }