You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2019/07/10 16:42:44 UTC

[flink] branch master updated: [FLINK-7244][parquet] Add ParquetTableSource.

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 38e5e81  [FLINK-7244][parquet] Add ParquetTableSource.
38e5e81 is described below

commit 38e5e8161a9c763cf7df3b642830b5a97371bb00
Author: Peter Huang <hu...@gmail.com>
AuthorDate: Sun Mar 24 23:19:18 2019 -0700

    [FLINK-7244][parquet] Add ParquetTableSource.
    
    This closes #8064.
---
 flink-formats/flink-parquet/pom.xml                |  28 +-
 .../flink/formats/parquet/ParquetInputFormat.java  |  16 +-
 .../formats/parquet/ParquetRowInputFormat.java     |   4 +-
 .../flink/formats/parquet/ParquetTableSource.java  | 568 +++++++++++++++++++++
 .../parquet/utils/ParquetSchemaConverter.java      |   2 +-
 .../formats/parquet/ParquetMapInputFormatTest.java |   2 +-
 .../formats/parquet/ParquetTableSourceITCase.java  | 116 +++++
 .../formats/parquet/ParquetTableSourceTest.java    | 234 +++++++++
 .../parquet/utils/ParquetRecordReaderTest.java     |   4 +-
 .../parquet/utils/ParquetSchemaConverterTest.java  |  39 +-
 .../flink/formats/parquet/utils/TestUtil.java      |  72 ++-
 .../src/test/resources/avro/nested.avsc            |   2 +-
 12 files changed, 1041 insertions(+), 46 deletions(-)

diff --git a/flink-formats/flink-parquet/pom.xml b/flink-formats/flink-parquet/pom.xml
index 4a2fd32..b43715d 100644
--- a/flink-formats/flink-parquet/pom.xml
+++ b/flink-formats/flink-parquet/pom.xml
@@ -29,7 +29,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-parquet</artifactId>
+	<artifactId>flink-parquet_${scala.binary.version}</artifactId>
 	<name>flink-parquet</name>
 
 	<packaging>jar</packaging>
@@ -39,7 +39,6 @@ under the License.
 	</properties>
 
 	<dependencies>
-
 		<!-- Flink dependencies -->
 
 		<dependency>
@@ -49,6 +48,31 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Table ecosystem -->
+		<!-- Projects depending on this project won't depend on flink-table-*. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<optional>true</optional>
+		</dependency>
+		<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<optional>true</optional>
+		</dependency>
+
 		<!-- Parquet Dependencies -->
 
 		<dependency>
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
index e7484cb..554313e 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.parquet;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.CheckpointableInputFormat;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,6 +36,7 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.io.InputFile;
@@ -85,6 +87,8 @@ public abstract class ParquetInputFormat<E>
 
 	private String[] fieldNames;
 
+	private FilterPredicate filterPredicate;
+
 	private transient Counter recordConsumed;
 
 	private transient MessageType expectedFileSchema;
@@ -143,6 +147,10 @@ public abstract class ParquetInputFormat<E>
 		this.fieldTypes = selectFieldTypes;
 	}
 
+	public void setFilterPredicate(FilterPredicate filterPredicate) {
+		this.filterPredicate = filterPredicate;
+	}
+
 	@Override
 	public Tuple2<Long, Long> getCurrentState() {
 		return parquetRecordReader.getCurrentReadPosition();
@@ -164,7 +172,8 @@ public abstract class ParquetInputFormat<E>
 				"Escaped the file split [%s] due to mismatch of file schema to expected result schema",
 				split.getPath().toString()));
 		} else {
-			this.parquetRecordReader = new ParquetRecordReader<>(new RowReadSupport(), readSchema, FilterCompat.NOOP);
+			this.parquetRecordReader = new ParquetRecordReader<>(new RowReadSupport(), readSchema,
+				filterPredicate == null ? FilterCompat.NOOP : FilterCompat.get(filterPredicate));
 			this.parquetRecordReader.initialize(fileReader, configuration);
 			this.parquetRecordReader.setSkipCorruptedRecord(this.skipCorruptedRecord);
 
@@ -203,6 +212,11 @@ public abstract class ParquetInputFormat<E>
 		return fieldTypes;
 	}
 
+	@VisibleForTesting
+	protected FilterPredicate getPredicate() {
+		return this.filterPredicate;
+	}
+
 	@Override
 	public void close() throws IOException {
 		if (parquetRecordReader != null) {
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
index f010a50..13da9c7 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
@@ -31,16 +31,14 @@ import org.apache.parquet.schema.MessageType;
  */
 public class ParquetRowInputFormat extends ParquetInputFormat<Row> implements ResultTypeQueryable<Row> {
 	private static final long serialVersionUID = 11L;
-	private RowTypeInfo returnType;
 
 	public ParquetRowInputFormat(Path path, MessageType messageType) {
 		super(path, messageType);
-		this.returnType = new RowTypeInfo(getFieldTypes(), getFieldNames());
 	}
 
 	@Override
 	public TypeInformation<Row> getProducedType() {
-		return returnType;
+		return new RowTypeInfo(getFieldTypes(), getFieldNames());
 	}
 
 	@Override
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
new file mode 100644
index 0000000..0b5d168
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.And;
+import org.apache.flink.table.expressions.Attribute;
+import org.apache.flink.table.expressions.BinaryComparison;
+import org.apache.flink.table.expressions.BinaryExpression;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.expressions.LessThan;
+import org.apache.flink.table.expressions.LessThanOrEqual;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.Not;
+import org.apache.flink.table.expressions.NotEqualTo;
+import org.apache.flink.table.expressions.Or;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.FilterableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A TableSource to read Parquet files.
+ *
+ * <p>The {@link ParquetTableSource} supports projection and filter push-down.</p>
+ *
+ * <p>An {@link ParquetTableSource} is used as shown in the example below.
+ *
+ * <pre>
+ * {@code
+ * ParquetTableSource orcSrc = ParquetTableSource.builder()
+ *   .path("file:///my/data/file.parquet")
+ *   .schema(messageType)
+ *   .build();
+ *
+ * tEnv.registerTableSource("parquetTable", orcSrc);
+ * Table res = tableEnv.sqlQuery("SELECT * FROM parquetTable");
+ * }
+ * </pre>
+ */
+public class ParquetTableSource
+	implements BatchTableSource<Row>, FilterableTableSource<Row>, ProjectableTableSource<Row> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ParquetTableSource.class);
+
+	// path to read Parquet files from
+	private final String path;
+	// schema of the Parquet file
+	private final MessageType parquetSchema;
+	// the schema of table
+	private final TableSchema tableSchema;
+	// the configuration to read the file
+	private final Configuration parquetConfig;
+	// type information of the data returned by the InputFormat
+	private final RowTypeInfo typeInfo;
+	// list of selected Parquet fields to return
+	@Nullable
+	private final int[] selectedFields;
+	// predicate expression to apply
+	@Nullable
+	private final FilterPredicate predicate;
+	// flag whether a path is recursively enumerated
+	private final boolean recursiveEnumeration;
+
+	private boolean isFilterPushedDown;
+
+	private ParquetTableSource(String path, MessageType parquetSchema, Configuration configuration,
+									boolean recursiveEnumeration) {
+		this(path, parquetSchema, configuration, recursiveEnumeration, null, null);
+	}
+
+	private ParquetTableSource(String path, MessageType parquetSchema, Configuration configuration,
+									boolean recursiveEnumeration, @Nullable int[] selectedFields, @Nullable FilterPredicate predicate) {
+		Preconditions.checkNotNull(path, "Path must not be null.");
+		Preconditions.checkNotNull(parquetSchema, "ParquetSchema must not be null.");
+		Preconditions.checkNotNull(configuration, "Configuration must not be null");
+		this.path = path;
+		this.parquetSchema = parquetSchema;
+		this.parquetConfig = configuration;
+		this.selectedFields = selectedFields;
+		this.predicate = predicate;
+		this.recursiveEnumeration = recursiveEnumeration;
+
+		if (predicate != null) {
+			this.isFilterPushedDown = true;
+		}
+		// determine the type information from the Parquet schema
+		RowTypeInfo typeInfoFromSchema = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(parquetSchema);
+
+		// set return type info
+		if (selectedFields == null) {
+			this.typeInfo = typeInfoFromSchema;
+		} else {
+			this.typeInfo = RowTypeInfo.projectFields(typeInfoFromSchema, selectedFields);
+		}
+
+		// create a TableSchema that corresponds to the Parquet schema
+		this.tableSchema = new TableSchema(
+			typeInfoFromSchema.getFieldNames(),
+			typeInfoFromSchema.getFieldTypes()
+		);
+	}
+
+	@Override
+	public TableSource<Row> projectFields(int[] fields) {
+		return new ParquetTableSource(path, parquetSchema, parquetConfig, recursiveEnumeration, fields, null);
+	}
+
+	@Override
+	public DataSet<Row> getDataSet(ExecutionEnvironment executionEnvironment) {
+		ParquetRowInputFormat parquetRowInputFormat = new ParquetRowInputFormat(new Path(path), parquetSchema);
+		parquetRowInputFormat.setNestedFileEnumeration(recursiveEnumeration);
+		if (selectedFields != null) {
+			parquetRowInputFormat.selectFields(typeInfo.getFieldNames());
+		}
+
+		if (predicate != null) {
+			parquetRowInputFormat.setFilterPredicate(predicate);
+		}
+
+		return executionEnvironment.createInput(parquetRowInputFormat).name(explainSource());
+	}
+
+	@Override
+	public TableSource<Row> applyPredicate(List<Expression> predicates) {
+
+		// try to convert Flink filter expressions to Parquet FilterPredicates
+		List<FilterPredicate> convertedPredicates = new ArrayList<>(predicates.size());
+		List<Expression> unsupportedExpressions = new ArrayList<>(predicates.size());
+
+		for (Expression toConvert : predicates) {
+			FilterPredicate convertedPredicate = toParquetPredicate(toConvert);
+			if (convertedPredicate != null) {
+				convertedPredicates.add(convertedPredicate);
+			} else {
+				unsupportedExpressions.add(toConvert);
+			}
+		}
+
+		// update list of Flink expressions to unsupported expressions
+		predicates.clear();
+		predicates.addAll(unsupportedExpressions);
+
+		// construct single Parquet FilterPredicate
+		FilterPredicate parquetPredicate = null;
+		if (!convertedPredicates.isEmpty()) {
+			// concat converted predicates with AND
+			parquetPredicate = convertedPredicates.get(0);
+
+			for (FilterPredicate converted : convertedPredicates.subList(1, convertedPredicates.size())) {
+				parquetPredicate = FilterApi.and(parquetPredicate, converted);
+			}
+		}
+
+		// create and return a new ParquetTableSource with Parquet FilterPredicate
+		return new ParquetTableSource(path, parquetSchema, this.parquetConfig, recursiveEnumeration, selectedFields, parquetPredicate);
+	}
+
+	@Override
+	public boolean isFilterPushedDown() {
+		return isFilterPushedDown;
+	}
+
+	@Override
+	public TypeInformation<Row> getReturnType() {
+		return typeInfo;
+	}
+
+	@Override
+	public TableSchema getTableSchema() {
+		return tableSchema;
+	}
+
+	@Override
+	public String explainSource() {
+		return "ParquetFile[path=" + path + ", schema=" + parquetSchema + ", filter=" + predicateString()
+			+ ", typeInfo=" + typeInfo + "]";
+	}
+
+	private String predicateString() {
+		if (predicate != null) {
+			return predicate.toString();
+		} else {
+			return "TRUE";
+		}
+	}
+
+	/**
+	 * Converts Flink Expression to Parquet FilterPredicate.
+	 */
+	@Nullable
+	private FilterPredicate toParquetPredicate(Expression exp) {
+		if (exp instanceof Not) {
+			FilterPredicate c = toParquetPredicate(((Not) exp).child());
+			if (c == null) {
+				return null;
+			} else {
+				return FilterApi.not(c);
+			}
+		} else if (exp instanceof BinaryComparison) {
+			BinaryComparison binComp = (BinaryComparison) exp;
+
+			if (!isValid(binComp)) {
+				// unsupported literal Type
+				LOG.debug("Unsupported predict [{}] cannot be pushed to ParquetTableSource.", exp);
+				return null;
+			}
+
+			boolean onRight = literalOnRight(binComp);
+			Tuple2<Column, Comparable> columnPair = extractColumnAndLiteral(binComp);
+
+			if (columnPair != null) {
+				if (exp instanceof EqualTo) {
+					if (columnPair.f0 instanceof IntColumn) {
+						return FilterApi.eq((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+					} else if (columnPair.f0 instanceof LongColumn) {
+						return FilterApi.eq((LongColumn) columnPair.f0, (Long) columnPair.f1);
+					} else if (columnPair.f0 instanceof DoubleColumn) {
+						return FilterApi.eq((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+					} else if (columnPair.f0 instanceof FloatColumn) {
+						return FilterApi.eq((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+					} else if (columnPair.f0 instanceof BooleanColumn) {
+						return FilterApi.eq((BooleanColumn) columnPair.f0, (Boolean) columnPair.f1);
+					} else if (columnPair.f0 instanceof BinaryColumn) {
+						return FilterApi.eq((BinaryColumn) columnPair.f0, (Binary) columnPair.f1);
+					}
+				} else if (exp instanceof NotEqualTo) {
+					if (columnPair.f0 instanceof IntColumn) {
+						return FilterApi.notEq((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+					} else if (columnPair.f0 instanceof LongColumn) {
+						return FilterApi.notEq((LongColumn) columnPair.f0, (Long) columnPair.f1);
+					} else if (columnPair.f0 instanceof DoubleColumn) {
+						return FilterApi.notEq((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+					} else if (columnPair.f0 instanceof FloatColumn) {
+						return FilterApi.notEq((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+					} else if (columnPair.f0 instanceof BooleanColumn) {
+						return FilterApi.notEq((BooleanColumn) columnPair.f0, (Boolean) columnPair.f1);
+					} else if (columnPair.f0 instanceof BinaryColumn) {
+						return FilterApi.notEq((BinaryColumn) columnPair.f0, (Binary) columnPair.f1);
+					}
+				} else if (exp instanceof GreaterThan) {
+					if (onRight) {
+						return greaterThan(exp, columnPair);
+					} else {
+						lessThan(exp, columnPair);
+					}
+				} else if (exp instanceof GreaterThanOrEqual) {
+					if (onRight) {
+						return greaterThanOrEqual(exp, columnPair);
+					} else {
+						return lessThanOrEqual(exp, columnPair);
+					}
+				} else if (exp instanceof LessThan) {
+					if (onRight) {
+						return lessThan(exp, columnPair);
+					} else {
+						return greaterThan(exp, columnPair);
+					}
+				} else if (exp instanceof LessThanOrEqual) {
+					if (onRight) {
+						return lessThanOrEqual(exp, columnPair);
+					} else {
+						return greaterThanOrEqual(exp, columnPair);
+					}
+				} else {
+					// Unsupported Predicate
+					LOG.debug("Unsupported predicate [{}] cannot be pushed into ParquetTableSource.", exp);
+					return null;
+				}
+			}
+		} else if (exp instanceof BinaryExpression) {
+			if (exp instanceof And) {
+				LOG.debug("All of the predicates should be in CNF. Found an AND expression.", exp);
+			} else if (exp instanceof Or) {
+				FilterPredicate c1 = toParquetPredicate(((Or) exp).left());
+				FilterPredicate c2 = toParquetPredicate(((Or) exp).right());
+
+				if (c1 == null || c2 == null) {
+					return null;
+				} else {
+					return FilterApi.or(c1, c2);
+				}
+			} else {
+				// Unsupported Predicate
+				LOG.debug("Unsupported predicate [{}] cannot be pushed into ParquetTableSource.", exp);
+				return null;
+			}
+		}
+
+		return null;
+	}
+
+	@Nullable
+	private FilterPredicate greaterThan(Expression exp, Tuple2<Column, Comparable> columnPair) {
+		Preconditions.checkArgument(exp instanceof GreaterThan, "exp has to be GreaterThan");
+		if (columnPair.f0 instanceof IntColumn) {
+			return FilterApi.gt((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+		} else if (columnPair.f0 instanceof LongColumn) {
+			return FilterApi.gt((LongColumn) columnPair.f0, (Long) columnPair.f1);
+		} else if (columnPair.f0 instanceof DoubleColumn) {
+			return FilterApi.gt((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+		} else if (columnPair.f0 instanceof FloatColumn) {
+			return FilterApi.gt((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+		}
+
+		return null;
+	}
+
+	@Nullable
+	private FilterPredicate lessThan(Expression exp, Tuple2<Column, Comparable> columnPair) {
+		Preconditions.checkArgument(exp instanceof LessThan, "exp has to be LessThan");
+
+		if (columnPair.f0 instanceof IntColumn) {
+			return FilterApi.lt((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+		} else if (columnPair.f0 instanceof LongColumn) {
+			return FilterApi.lt((LongColumn) columnPair.f0, (Long) columnPair.f1);
+		} else if (columnPair.f0 instanceof DoubleColumn) {
+			return FilterApi.lt((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+		} else if (columnPair.f0 instanceof FloatColumn) {
+			return FilterApi.lt((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+		}
+
+		return null;
+	}
+
+	@Nullable
+	private FilterPredicate greaterThanOrEqual(Expression exp, Tuple2<Column, Comparable> columnPair) {
+		Preconditions.checkArgument(exp instanceof GreaterThanOrEqual, "exp has to be GreaterThanOrEqual");
+		if (columnPair.f0 instanceof IntColumn) {
+			return FilterApi.gtEq((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+		} else if (columnPair.f0 instanceof LongColumn) {
+			return FilterApi.gtEq((LongColumn) columnPair.f0, (Long) columnPair.f1);
+		} else if (columnPair.f0 instanceof DoubleColumn) {
+			return FilterApi.gtEq((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+		} else if (columnPair.f0 instanceof FloatColumn) {
+			return FilterApi.gtEq((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+		}
+
+		return null;
+	}
+
+	@Nullable
+	private FilterPredicate lessThanOrEqual(Expression exp, Tuple2<Column, Comparable> columnPair) {
+		Preconditions.checkArgument(exp instanceof LessThanOrEqual, "exp has to be LessThanOrEqual");
+		if (columnPair.f0 instanceof IntColumn) {
+			return FilterApi.ltEq((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+		} else if (columnPair.f0 instanceof LongColumn) {
+			return FilterApi.ltEq((LongColumn) columnPair.f0, (Long) columnPair.f1);
+		} else if (columnPair.f0 instanceof DoubleColumn) {
+			return FilterApi.ltEq((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+		} else if (columnPair.f0 instanceof FloatColumn) {
+			return FilterApi.ltEq((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+		}
+
+		return null;
+	}
+
+	private boolean isValid(BinaryComparison comp) {
+		return (comp.left() instanceof Literal && comp.right() instanceof Attribute) ||
+			(comp.left() instanceof Attribute && comp.right() instanceof Literal);
+	}
+
+	private boolean literalOnRight(BinaryComparison comp) {
+		if (comp.left() instanceof Literal && comp.right() instanceof Attribute) {
+			return false;
+		} else if (comp.left() instanceof Attribute && comp.right() instanceof Literal) {
+			return true;
+		} else {
+			throw new RuntimeException("Invalid binary comparison.");
+		}
+	}
+
+	private TypeInformation<?> getLiteralType(BinaryComparison comp) {
+		if (literalOnRight(comp)) {
+			return ((Literal) comp.right()).resultType();
+		} else {
+			return ((Literal) comp.left()).resultType();
+		}
+	}
+
+	private Object getLiteral(BinaryComparison comp) {
+		if (literalOnRight(comp)) {
+			return ((Literal) comp.right()).value();
+		} else {
+			return ((Literal) comp.left()).value();
+		}
+	}
+
+	private String getColumnName(BinaryComparison comp) {
+		if (literalOnRight(comp)) {
+			return ((Attribute) comp.left()).name();
+		} else {
+			return ((Attribute) comp.right()).name();
+		}
+	}
+
+	@Nullable
+	private Tuple2<Column, Comparable> extractColumnAndLiteral(BinaryComparison comp) {
+		TypeInformation<?> typeInfo = getLiteralType(comp);
+		String columnName = getColumnName(comp);
+
+		// fetch literal and ensure it is comparable
+		Object value = getLiteral(comp);
+		// validate that literal is comparable
+		if (!(value instanceof Comparable)) {
+			LOG.warn("Encountered a non-comparable literal of type {}." +
+				"Cannot push predicate [{}] into ParquetTablesource." +
+				"This is a bug and should be reported.", value.getClass().getCanonicalName(), comp);
+			return null;
+		}
+
+		if (typeInfo == BasicTypeInfo.BYTE_TYPE_INFO ||
+			typeInfo == BasicTypeInfo.SHORT_TYPE_INFO ||
+			typeInfo == BasicTypeInfo.INT_TYPE_INFO) {
+			return new Tuple2<>(FilterApi.intColumn(columnName), (Integer) value);
+		} else if (typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+			return new Tuple2<>(FilterApi.longColumn(columnName), (Long) value);
+		} else if (typeInfo == BasicTypeInfo.FLOAT_TYPE_INFO) {
+			return new Tuple2<>(FilterApi.floatColumn(columnName), (Float) value);
+		} else if (typeInfo == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+			return new Tuple2<>(FilterApi.booleanColumn(columnName), (Boolean) value);
+		} else if (typeInfo == BasicTypeInfo.DOUBLE_TYPE_INFO) {
+			return new Tuple2<>(FilterApi.doubleColumn(columnName), (Double) value);
+		} else if (typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
+			return new Tuple2<>(FilterApi.binaryColumn(columnName), Binary.fromString((String) value));
+		} else {
+			// unsupported type
+			return null;
+		}
+	}
+
+	// Builder
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	/**
+	 * Constructs an {@link ParquetTableSource}.
+	 */
+	public static class Builder {
+
+		private String path;
+
+		private MessageType schema;
+
+		private Configuration config;
+
+		private boolean recursive = true;
+
+		/**
+		 * Sets the path of Parquet files.
+		 * If the path is specifies a directory, it will be recursively enumerated.
+		 *
+		 * @param path the path of the Parquet files.
+		 * @return The Builder
+		 */
+		public Builder path(String path) {
+			Preconditions.checkNotNull(path, "Path must not be null");
+			Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty");
+			this.path = path;
+			return this;
+		}
+
+		/**
+		 * Sets the path of the Parquet files.
+		 *
+		 * @param path The path of the Parquet files
+		 * @param recursive Flag whether to enumerate
+		 * @return The Builder
+		 */
+		public Builder path(String path, boolean recursive) {
+			Preconditions.checkNotNull(path, "Path must not be null");
+			Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty");
+			this.path = path;
+			this.recursive = recursive;
+			return this;
+		}
+
+		/**
+		 * Sets the Parquet schema of the files to read as a String.
+		 *
+		 * @param parquetSchema The parquet schema of the files to read as a String.
+		 * @return The Builder
+		 */
+		public Builder forParquetSchema(MessageType parquetSchema) {
+			Preconditions.checkNotNull(parquetSchema, "Parquet schema must not be null");
+			this.schema = parquetSchema;
+			return this;
+		}
+
+		/**
+		 * Sets a Hadoop {@link Configuration} for the Parquet Reader. If no configuration is configured,
+		 * an empty configuration is used.
+		 *
+		 * @param config The Hadoop Configuration for the Parquet reader.
+		 * @return The Builder
+		 */
+		public Builder withConfiguration(Configuration config) {
+			Preconditions.checkNotNull(config, "Configuration must not be null.");
+			this.config = config;
+			return this;
+		}
+
+		/**
+		 * Builds the ParquetTableSource for this builder.
+		 *
+		 * @return The ParquetTableSource for this builder.
+		 */
+		public ParquetTableSource build() {
+			Preconditions.checkNotNull(path, "Path must not be null");
+			Preconditions.checkNotNull(schema, "Parquet schema must not be null");
+			if (config == null) {
+				this.config = new Configuration();
+			}
+
+			return new ParquetTableSource(this.path, this.schema, this.config, this.recursive);
+		}
+	}
+}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
index 35e1977..084c060 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
@@ -43,7 +43,7 @@ import java.util.List;
  * Schema converter converts Parquet schema to and from Flink internal types.
  */
 public class ParquetSchemaConverter {
-	private static final Logger LOGGER = LoggerFactory.getLogger(RowConverter.class);
+	private static final Logger LOGGER = LoggerFactory.getLogger(ParquetSchemaConverter.class);
 	public static final String MAP_VALUE = "value";
 	public static final String LIST_ARRAY_TYPE = "array";
 	public static final String LIST_ELEMENT = "element";
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
index 2911166..f36b12c 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
@@ -77,7 +77,7 @@ public class ParquetMapInputFormatTest {
 		List<Map<String, String>> nestedArray = (List<Map<String, String>>) map.get("nestedArray");
 		assertEquals(1, nestedArray.size());
 		assertEquals("color", nestedArray.get(0).get("type"));
-		assertEquals("yellow", nestedArray.get(0).get("value"));
+		assertEquals(1L, nestedArray.get(0).get("value"));
 	}
 
 	@Test
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
new file mode 100644
index 0000000..d8eba5e
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Integration tests for {@link ParquetTableSource}.
+ */
+public class ParquetTableSourceITCase extends MultipleProgramsTestBase {
+	private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();
+	private static Path testPath;
+
+	@ClassRule
+	public static TemporaryFolder tempRoot = new TemporaryFolder();
+
+	public ParquetTableSourceITCase() {
+		super(TestExecutionMode.COLLECTION);
+	}
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		testPath = createTestParquetFile(1000);
+	}
+
+	@Test
+	public void testFullScan() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
+		ParquetTableSource tableSource = createParquetTableSource(testPath);
+		batchTableEnvironment.registerTableSource("ParquetTable", tableSource);
+		String query =
+			"SELECT foo " +
+			"FROM ParquetTable";
+
+		Table table = batchTableEnvironment.sqlQuery(query);
+		DataSet<Row> dataSet = batchTableEnvironment.toDataSet(table, Row.class);
+		List<Row> result = dataSet.collect();
+
+		assertEquals(1000, result.size());
+	}
+
+	@Test
+	public void testScanWithProjectionAndFilter() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
+		ParquetTableSource tableSource = createParquetTableSource(testPath);
+		batchTableEnvironment.registerTableSource("ParquetTable", tableSource);
+		String query =
+			"SELECT foo " +
+			"FROM ParquetTable WHERE bar.spam >= 30 AND CARDINALITY(arr) >= 1 AND arr[1] <= 50";
+
+		Table table = batchTableEnvironment.sqlQuery(query);
+		DataSet<Row> dataSet = batchTableEnvironment.toDataSet(table, Row.class);
+		List<Row> result = dataSet.collect();
+
+		assertEquals(21, result.size());
+	}
+
+	/**
+	 * Create test Parquet table source that reads a test file created by {@link #createTestParquetFile(int)}.
+	 */
+	private ParquetTableSource createParquetTableSource(Path path) throws IOException {
+		MessageType nestedSchema = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+		ParquetTableSource parquetTableSource = ParquetTableSource.builder()
+			.path(path.getPath())
+			.forParquetSchema(nestedSchema)
+			.build();
+		return parquetTableSource;
+	}
+
+	/**
+	 * Create a test Parquet file with a given number of rows.
+	 */
+	private static Path createTestParquetFile(int numberOfRows) throws Exception {
+		List<IndexedRecord> records = TestUtil.createRecordList(numberOfRows);
+		Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), TestUtil.NESTED_SCHEMA, records);
+		return path;
+	}
+}
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
new file mode 100644
index 0000000..1e6ebf8
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GetCompositeField;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.ItemAt;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.PlannerResolvedFieldReference;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.schema.MessageType;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link ParquetTableSource}.
+ */
+public class ParquetTableSourceTest extends TestUtil {
+	private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();
+	private static Path testPath;
+
+	@ClassRule
+	public static TemporaryFolder tempRoot = new TemporaryFolder();
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		testPath = createTestParquetFile();
+	}
+
+	@Test
+	public void testGetReturnType() {
+		MessageType nestedSchema = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+		ParquetTableSource parquetTableSource = ParquetTableSource.builder()
+			.path("dummy-path")
+			.forParquetSchema(nestedSchema)
+			.build();
+
+		TypeInformation<Row> returnType = parquetTableSource.getReturnType();
+		assertNotNull(returnType);
+		assertTrue(returnType instanceof RowTypeInfo);
+		RowTypeInfo rowType = (RowTypeInfo) returnType;
+		assertEquals(NESTED_ROW_TYPE, rowType);
+	}
+
+	@Test
+	public void testGetTableSchema() {
+		MessageType nestedSchema = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+		ParquetTableSource parquetTableSource = ParquetTableSource.builder()
+			.path("dummy-path")
+			.forParquetSchema(nestedSchema)
+			.build();
+
+		TableSchema schema = parquetTableSource.getTableSchema();
+		assertNotNull(schema);
+
+		RowTypeInfo expectedSchema = (RowTypeInfo) NESTED_ROW_TYPE;
+		assertArrayEquals(expectedSchema.getFieldNames(), schema.getFieldNames());
+		assertArrayEquals(expectedSchema.getFieldTypes(), schema.getFieldTypes());
+	}
+
+	@Test
+	public void testFieldsProjection() throws Exception {
+		ParquetTableSource parquetTableSource = createNestedTestParquetTableSource(testPath);
+		ParquetTableSource projected = (ParquetTableSource) parquetTableSource.projectFields(new int[] {2, 4, 6});
+
+		// ensure a new reference is returned
+		assertNotSame(projected, parquetTableSource);
+
+		// ensure table schema is the same
+		assertEquals(parquetTableSource.getTableSchema(), projected.getTableSchema());
+
+		// ensure that table source description differs
+		assertNotEquals(parquetTableSource.explainSource(), projected.explainSource());
+
+		String[] fieldNames = ((RowTypeInfo) NESTED_ROW_TYPE).getFieldNames();
+		TypeInformation[] fieldTypes =  ((RowTypeInfo) NESTED_ROW_TYPE).getFieldTypes();
+		assertEquals(
+			Types.ROW_NAMED(
+				new String[] {fieldNames[2], fieldNames[4], fieldNames[6]},
+				fieldTypes[2], fieldTypes[4], fieldTypes[6]
+			),
+			projected.getReturnType()
+		);
+
+		// ensure ParquetInputFormat is configured with selected fields
+		DataSet<Row> data = projected.getDataSet(ExecutionEnvironment.createLocalEnvironment());
+		InputFormat<Row, ?> inputFormat = ((DataSource<Row>) data).getInputFormat();
+		assertTrue(inputFormat instanceof ParquetRowInputFormat);
+		ParquetRowInputFormat parquetIF = (ParquetRowInputFormat) inputFormat;
+		assertArrayEquals(new String[]{fieldNames[2], fieldNames[4], fieldNames[6]}, parquetIF.getFieldNames());
+		assertArrayEquals(new TypeInformation<?>[]{fieldTypes[2], fieldTypes[4], fieldTypes[6]}, parquetIF.getFieldTypes());
+	}
+
+	@Test
+	public void testFieldsFilter() throws Exception {
+		ParquetTableSource parquetTableSource = createNestedTestParquetTableSource(testPath);
+
+		// expressions for supported predicates
+		Expression exp1 = new GreaterThan(
+			new PlannerResolvedFieldReference("foo", Types.LONG),
+			new Literal(100L, Types.LONG));
+		Expression exp2 = new EqualTo(
+			new Literal(100L, Types.LONG),
+			new PlannerResolvedFieldReference("bar.spam", Types.LONG));
+
+		// unsupported predicate
+		Expression unsupported = new EqualTo(
+			new GetCompositeField(
+				new ItemAt(
+					new PlannerResolvedFieldReference(
+						"nestedArray",
+						ObjectArrayTypeInfo.getInfoFor(
+							Types.ROW_NAMED(new String[] {"type", "name"}, Types.STRING, Types.STRING))),
+						new Literal(1, Types.INT)),
+						"type"),
+			new Literal("test", Types.STRING));
+		// invalid predicate
+		Expression invalidPred = new EqualTo(
+			new PlannerResolvedFieldReference("nonField", Types.LONG),
+			// some invalid, non-serializable, literal (here an object of this test class)
+			new Literal(new ParquetTableSourceTest(), Types.LONG)
+		);
+
+		List<Expression> exps = new ArrayList<>();
+		exps.add(exp1);
+		exps.add(exp2);
+		exps.add(unsupported);
+		exps.add(invalidPred);
+
+		// apply predict on TableSource
+		ParquetTableSource filtered = (ParquetTableSource) parquetTableSource.applyPredicate(exps);
+
+		// ensure copy is returned
+		assertNotSame(parquetTableSource, filtered);
+
+		// ensure table schema is identical
+		assertEquals(parquetTableSource.getTableSchema(), filtered.getTableSchema());
+
+		// ensure return type is identical
+		assertEquals(NESTED_ROW_TYPE, filtered.getReturnType());
+
+		// ensure source description is not the same
+		assertNotEquals(parquetTableSource.explainSource(), filtered.explainSource());
+
+		// check that pushdown was recorded
+		assertTrue(filtered.isFilterPushedDown());
+		assertFalse(parquetTableSource.isFilterPushedDown());
+
+		// ensure that supported predicates were removed from list of offered expressions
+		assertEquals(2, exps.size());
+		assertTrue(exps.contains(unsupported));
+		assertTrue(exps.contains(invalidPred));
+
+		// ensure ParquetInputFormat is correctly configured with filter
+		DataSet<Row> data = filtered.getDataSet(ExecutionEnvironment.createLocalEnvironment());
+		InputFormat<Row, ?> inputFormat = ((DataSource<Row>) data).getInputFormat();
+		assertTrue(inputFormat instanceof ParquetRowInputFormat);
+		ParquetRowInputFormat parquetIF = (ParquetRowInputFormat) inputFormat;
+
+		// expected predicate
+		FilterPredicate a = FilterApi.gt(FilterApi.longColumn("foo"), 100L);
+		FilterPredicate b = FilterApi.eq(FilterApi.longColumn("bar.spam"), 100L);
+		FilterPredicate expected = FilterApi.and(a, b);
+		// actual predicate
+		FilterPredicate predicate = parquetIF.getPredicate();
+		// check predicate
+		assertEquals(expected, predicate);
+	}
+
+	private static Path createTestParquetFile() throws Exception {
+		Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> nested = getNestedRecordTestData();
+		Path path = createTempParquetFile(tempRoot.getRoot(), NESTED_SCHEMA,
+			Collections.singletonList(nested.f1));
+		return path;
+	}
+
+	private ParquetTableSource createNestedTestParquetTableSource(Path path) throws Exception {
+		MessageType nestedSchema = SCHEMA_CONVERTER.convert(NESTED_SCHEMA);
+		ParquetTableSource parquetTableSource = ParquetTableSource.builder()
+			.path(path.getPath())
+			.forParquetSchema(nestedSchema)
+			.build();
+		return parquetTableSource;
+	}
+}
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
index d021fc3..6c79605 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
@@ -279,7 +279,7 @@ public class ParquetRecordReaderTest extends TestUtil {
 		Schema arrayItemSchema = nestedArraySchema.getElementType();
 		GenericRecord item = new GenericRecordBuilder(arrayItemSchema)
 			.set("type", "nested")
-			.set("value", "nested_value").build();
+			.set("value", 1L).build();
 
 		ImmutableList.Builder<GenericRecord> list = ImmutableList.builder();
 		list.add(item);
@@ -310,7 +310,7 @@ public class ParquetRecordReaderTest extends TestUtil {
 
 		Row nestedRow = (Row) result[0];
 		assertEquals("nested", nestedRow.getField(0));
-		assertEquals("nested_value", nestedRow.getField(1));
+		assertEquals(1L, nestedRow.getField(1));
 	}
 
 	private Schema unWrapSchema(Schema o) {
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
index ce13c8d..10db6d2 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
@@ -18,12 +18,7 @@
 
 package org.apache.flink.formats.parquet.utils;
 
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.types.Row;
 
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
@@ -32,7 +27,6 @@ import org.apache.parquet.schema.Type;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -40,27 +34,6 @@ import static org.junit.Assert.assertEquals;
  * Simple test case for conversion between Parquet schema and Flink date types.
  */
 public class ParquetSchemaConverterTest extends TestUtil {
-	private final TypeInformation<Row> simplyRowType = Types.ROW_NAMED(new String[] {"foo", "bar", "arr"},
-		BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO);
-
-	private final TypeInformation<Row[]> nestedArray = Types.OBJECT_ARRAY(Types.ROW_NAMED(new String[] {"type", "value"},
-		BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
-
-	@SuppressWarnings("unchecked")
-	private final TypeInformation<Map<String, Row>> nestedMap = Types.MAP(BasicTypeInfo.STRING_TYPE_INFO,
-		Types.ROW_NAMED(new String[] {"type", "value"},
-			BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
-
-	@SuppressWarnings("unchecked")
-	private final TypeInformation<Row> nestedRowType = Types.ROW_NAMED(
-		new String[] {"foo", "spamMap", "bar", "arr", "strArray", "nestedMap", "nestedArray"},
-		BasicTypeInfo.LONG_TYPE_INFO,
-		Types.MAP(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
-		Types.ROW_NAMED(new String[] {"spam"}, BasicTypeInfo.LONG_TYPE_INFO),
-		BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO,
-		BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
-		nestedMap,
-		nestedArray);
 
 	private final Type[] simpleStandardTypes = {
 		org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL)
@@ -102,8 +75,8 @@ public class ParquetSchemaConverterTest extends TestUtil {
 		org.apache.parquet.schema.Types.optionalGroup().addField(org.apache.parquet.schema.Types.repeatedGroup()
 			.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED)
 				.as(OriginalType.UTF8).named("type"))
-			.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED)
-				.as(OriginalType.UTF8).named("value"))
+			.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REQUIRED)
+				.as(OriginalType.INT_64).named("value"))
 			.named("element")).as(OriginalType.LIST)
 			.named("nestedArray")
 	};
@@ -112,25 +85,25 @@ public class ParquetSchemaConverterTest extends TestUtil {
 	public void testSimpleSchemaConversion() {
 		MessageType simpleType = new MessageType("simple", simpleStandardTypes);
 		RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(simpleType);
-		assertEquals(simplyRowType, rowTypeInfo);
+		assertEquals(SIMPLE_ROW_TYPE, rowTypeInfo);
 	}
 
 	@Test
 	public void testNestedSchemaConversion() {
 		MessageType nestedTypes = new MessageType("nested", this.nestedTypes);
 		RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(nestedTypes);
-		assertEquals(nestedRowType, rowTypeInfo);
+		assertEquals(NESTED_ROW_TYPE, rowTypeInfo);
 	}
 
 	@Test
 	public void testSimpleRowTypeConversion() {
-		MessageType simpleSchema = ParquetSchemaConverter.toParquetType(simplyRowType, true);
+		MessageType simpleSchema = ParquetSchemaConverter.toParquetType(SIMPLE_ROW_TYPE, true);
 		assertEquals(Arrays.asList(simpleStandardTypes), simpleSchema.getFields());
 	}
 
 	@Test
 	public void testNestedRowTypeConversion() {
-		MessageType nestedSchema = ParquetSchemaConverter.toParquetType(nestedRowType, true);
+		MessageType nestedSchema = ParquetSchemaConverter.toParquetType(NESTED_ROW_TYPE, true);
 		assertEquals(Arrays.asList(nestedTypes), nestedSchema.getFields());
 	}
 }
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
index ed64041..6b5cf2a 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
@@ -19,6 +19,10 @@
 package org.apache.flink.formats.parquet.utils;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.parquet.generated.ArrayItem;
@@ -52,11 +56,33 @@ import java.util.UUID;
  * Utilities for testing schema conversion and test parquet file creation.
  */
 public class TestUtil {
+	private static final TypeInformation<Row[]> nestedArray = Types.OBJECT_ARRAY(Types.ROW_NAMED(
+		new String[] {"type", "value"}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO));
+
+	@SuppressWarnings("unchecked")
+	private static final TypeInformation<Map<String, Row>> nestedMap = Types.MAP(BasicTypeInfo.STRING_TYPE_INFO,
+		Types.ROW_NAMED(new String[] {"type", "value"},
+			BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+
 	@ClassRule
 	public static TemporaryFolder tempRoot = new TemporaryFolder();
 	public static final Schema NESTED_SCHEMA = getTestSchema("nested.avsc");
 	public static final Schema SIMPLE_SCHEMA = getTestSchema("simple.avsc");
 
+	public static final TypeInformation<Row> SIMPLE_ROW_TYPE = Types.ROW_NAMED(new String[] {"foo", "bar", "arr"},
+		BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO);
+
+	@SuppressWarnings("unchecked")
+	public static final TypeInformation<Row> NESTED_ROW_TYPE = Types.ROW_NAMED(
+		new String[] {"foo", "spamMap", "bar", "arr", "strArray", "nestedMap", "nestedArray"},
+		BasicTypeInfo.LONG_TYPE_INFO,
+		Types.MAP(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+		Types.ROW_NAMED(new String[] {"spam"}, BasicTypeInfo.LONG_TYPE_INFO),
+		BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO,
+		BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
+		nestedMap,
+		nestedArray);
+
 	public static Path createTempParquetFile(File folder, Schema schema, List<IndexedRecord> records) throws IOException {
 		Path path = new Path(folder.getPath(), UUID.randomUUID().toString());
 		ParquetWriter<IndexedRecord> writer = AvroParquetWriter.<IndexedRecord>builder(
@@ -96,7 +122,7 @@ public class TestUtil {
 
 		final ArrayItem arrayItem = ArrayItem.newBuilder()
 			.setType("color")
-			.setValue("yellow").build();
+			.setValue(1L).build();
 
 		final MapItem mapItem = MapItem.newBuilder()
 			.setType("map")
@@ -129,7 +155,7 @@ public class TestUtil {
 
 		final Row arrayItemRow = new Row(2);
 		arrayItemRow.setField(0, "color");
-		arrayItemRow.setField(1, "yellow");
+		arrayItemRow.setField(1, 1L);
 
 		final Row mapItemRow = new Row(2);
 		mapItemRow.setField(0, "map");
@@ -154,6 +180,48 @@ public class TestUtil {
 		return t;
 	}
 
+	/**
+	 * Create a list of NestedRecord with the NESTED_SCHEMA.
+	 */
+	public static List<IndexedRecord> createRecordList(long numberOfRows) {
+		List<IndexedRecord> records = new ArrayList<>(0);
+		for (long i = 0; i < numberOfRows; i++) {
+			final Bar bar = Bar.newBuilder()
+				.setSpam(i).build();
+
+			final ArrayItem arrayItem = ArrayItem.newBuilder()
+				.setType("color")
+				.setValue(i).build();
+
+			final MapItem mapItem = MapItem.newBuilder()
+				.setType("map")
+				.setValue("hashMap").build();
+
+			List<ArrayItem> nestedArray = new ArrayList<>();
+			nestedArray.add(arrayItem);
+
+			Map<CharSequence, MapItem> nestedMap = new HashMap<>();
+			nestedMap.put("mapItem", mapItem);
+
+			List<Long> longArray = new ArrayList<>();
+			longArray.add(i);
+
+			List<CharSequence> stringArray = new ArrayList<>();
+			stringArray.add("String");
+
+			final NestedRecord nestedRecord = NestedRecord.newBuilder()
+				.setBar(bar)
+				.setNestedArray(nestedArray)
+				.setStrArray(stringArray)
+				.setNestedMap(nestedMap)
+				.setArr(longArray).build();
+
+			records.add(nestedRecord);
+		}
+
+		return records;
+	}
+
 	public static RuntimeContext getMockRuntimeContext() {
 		RuntimeContext mockContext = Mockito.mock(RuntimeContext.class);
 		Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup())
diff --git a/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc b/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc
index 2517c61..eb60752 100644
--- a/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc
+++ b/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc
@@ -26,7 +26,7 @@
         "name": "ArrayItem",
         "fields": [
            {"name": "type", "type": "string"},
-           {"name": "value", "type": "string"}]}
+           {"name": "value", "type": "long"}]}
         }]
       }
   ],