You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2019/07/10 16:42:44 UTC
[flink] branch master updated: [FLINK-7244][parquet] Add
ParquetTableSource.
This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 38e5e81 [FLINK-7244][parquet] Add ParquetTableSource.
38e5e81 is described below
commit 38e5e8161a9c763cf7df3b642830b5a97371bb00
Author: Peter Huang <hu...@gmail.com>
AuthorDate: Sun Mar 24 23:19:18 2019 -0700
[FLINK-7244][parquet] Add ParquetTableSource.
This closes #8064.
---
flink-formats/flink-parquet/pom.xml | 28 +-
.../flink/formats/parquet/ParquetInputFormat.java | 16 +-
.../formats/parquet/ParquetRowInputFormat.java | 4 +-
.../flink/formats/parquet/ParquetTableSource.java | 568 +++++++++++++++++++++
.../parquet/utils/ParquetSchemaConverter.java | 2 +-
.../formats/parquet/ParquetMapInputFormatTest.java | 2 +-
.../formats/parquet/ParquetTableSourceITCase.java | 116 +++++
.../formats/parquet/ParquetTableSourceTest.java | 234 +++++++++
.../parquet/utils/ParquetRecordReaderTest.java | 4 +-
.../parquet/utils/ParquetSchemaConverterTest.java | 39 +-
.../flink/formats/parquet/utils/TestUtil.java | 72 ++-
.../src/test/resources/avro/nested.avsc | 2 +-
12 files changed, 1041 insertions(+), 46 deletions(-)
diff --git a/flink-formats/flink-parquet/pom.xml b/flink-formats/flink-parquet/pom.xml
index 4a2fd32..b43715d 100644
--- a/flink-formats/flink-parquet/pom.xml
+++ b/flink-formats/flink-parquet/pom.xml
@@ -29,7 +29,7 @@ under the License.
<relativePath>..</relativePath>
</parent>
- <artifactId>flink-parquet</artifactId>
+ <artifactId>flink-parquet_${scala.binary.version}</artifactId>
<name>flink-parquet</name>
<packaging>jar</packaging>
@@ -39,7 +39,6 @@ under the License.
</properties>
<dependencies>
-
<!-- Flink dependencies -->
<dependency>
@@ -49,6 +48,31 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Table ecosystem -->
+ <!-- Projects depending on this project won't depend on flink-table-*. -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+ <!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+
<!-- Parquet Dependencies -->
<dependency>
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
index e7484cb..554313e 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.flink.formats.parquet;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,6 +36,7 @@ import org.apache.flink.util.Preconditions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
@@ -85,6 +87,8 @@ public abstract class ParquetInputFormat<E>
private String[] fieldNames;
+ private FilterPredicate filterPredicate;
+
private transient Counter recordConsumed;
private transient MessageType expectedFileSchema;
@@ -143,6 +147,10 @@ public abstract class ParquetInputFormat<E>
this.fieldTypes = selectFieldTypes;
}
+ public void setFilterPredicate(FilterPredicate filterPredicate) {
+ this.filterPredicate = filterPredicate;
+ }
+
@Override
public Tuple2<Long, Long> getCurrentState() {
return parquetRecordReader.getCurrentReadPosition();
@@ -164,7 +172,8 @@ public abstract class ParquetInputFormat<E>
"Escaped the file split [%s] due to mismatch of file schema to expected result schema",
split.getPath().toString()));
} else {
- this.parquetRecordReader = new ParquetRecordReader<>(new RowReadSupport(), readSchema, FilterCompat.NOOP);
+ this.parquetRecordReader = new ParquetRecordReader<>(new RowReadSupport(), readSchema,
+ filterPredicate == null ? FilterCompat.NOOP : FilterCompat.get(filterPredicate));
this.parquetRecordReader.initialize(fileReader, configuration);
this.parquetRecordReader.setSkipCorruptedRecord(this.skipCorruptedRecord);
@@ -203,6 +212,11 @@ public abstract class ParquetInputFormat<E>
return fieldTypes;
}
+ @VisibleForTesting
+ protected FilterPredicate getPredicate() {
+ return this.filterPredicate;
+ }
+
@Override
public void close() throws IOException {
if (parquetRecordReader != null) {
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
index f010a50..13da9c7 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
@@ -31,16 +31,14 @@ import org.apache.parquet.schema.MessageType;
*/
public class ParquetRowInputFormat extends ParquetInputFormat<Row> implements ResultTypeQueryable<Row> {
private static final long serialVersionUID = 11L;
- private RowTypeInfo returnType;
public ParquetRowInputFormat(Path path, MessageType messageType) {
super(path, messageType);
- this.returnType = new RowTypeInfo(getFieldTypes(), getFieldNames());
}
@Override
public TypeInformation<Row> getProducedType() {
- return returnType;
+ return new RowTypeInfo(getFieldTypes(), getFieldNames());
}
@Override
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
new file mode 100644
index 0000000..0b5d168
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.And;
+import org.apache.flink.table.expressions.Attribute;
+import org.apache.flink.table.expressions.BinaryComparison;
+import org.apache.flink.table.expressions.BinaryExpression;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.expressions.LessThan;
+import org.apache.flink.table.expressions.LessThanOrEqual;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.Not;
+import org.apache.flink.table.expressions.NotEqualTo;
+import org.apache.flink.table.expressions.Or;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.FilterableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A TableSource to read Parquet files.
+ *
+ * <p>The {@link ParquetTableSource} supports projection and filter push-down.</p>
+ *
+ * <p>An {@link ParquetTableSource} is used as shown in the example below.
+ *
+ * <pre>
+ * {@code
+ * ParquetTableSource orcSrc = ParquetTableSource.builder()
+ * .path("file:///my/data/file.parquet")
+ * .schema(messageType)
+ * .build();
+ *
+ * tEnv.registerTableSource("parquetTable", orcSrc);
+ * Table res = tableEnv.sqlQuery("SELECT * FROM parquetTable");
+ * }
+ * </pre>
+ */
+public class ParquetTableSource
+ implements BatchTableSource<Row>, FilterableTableSource<Row>, ProjectableTableSource<Row> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ParquetTableSource.class);
+
+ // path to read Parquet files from
+ private final String path;
+ // schema of the Parquet file
+ private final MessageType parquetSchema;
+ // the schema of table
+ private final TableSchema tableSchema;
+ // the configuration to read the file
+ private final Configuration parquetConfig;
+ // type information of the data returned by the InputFormat
+ private final RowTypeInfo typeInfo;
+ // list of selected Parquet fields to return
+ @Nullable
+ private final int[] selectedFields;
+ // predicate expression to apply
+ @Nullable
+ private final FilterPredicate predicate;
+ // flag whether a path is recursively enumerated
+ private final boolean recursiveEnumeration;
+
+ private boolean isFilterPushedDown;
+
+ private ParquetTableSource(String path, MessageType parquetSchema, Configuration configuration,
+ boolean recursiveEnumeration) {
+ this(path, parquetSchema, configuration, recursiveEnumeration, null, null);
+ }
+
+ private ParquetTableSource(String path, MessageType parquetSchema, Configuration configuration,
+ boolean recursiveEnumeration, @Nullable int[] selectedFields, @Nullable FilterPredicate predicate) {
+ Preconditions.checkNotNull(path, "Path must not be null.");
+ Preconditions.checkNotNull(parquetSchema, "ParquetSchema must not be null.");
+ Preconditions.checkNotNull(configuration, "Configuration must not be null");
+ this.path = path;
+ this.parquetSchema = parquetSchema;
+ this.parquetConfig = configuration;
+ this.selectedFields = selectedFields;
+ this.predicate = predicate;
+ this.recursiveEnumeration = recursiveEnumeration;
+
+ if (predicate != null) {
+ this.isFilterPushedDown = true;
+ }
+ // determine the type information from the Parquet schema
+ RowTypeInfo typeInfoFromSchema = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(parquetSchema);
+
+ // set return type info
+ if (selectedFields == null) {
+ this.typeInfo = typeInfoFromSchema;
+ } else {
+ this.typeInfo = RowTypeInfo.projectFields(typeInfoFromSchema, selectedFields);
+ }
+
+ // create a TableSchema that corresponds to the Parquet schema
+ this.tableSchema = new TableSchema(
+ typeInfoFromSchema.getFieldNames(),
+ typeInfoFromSchema.getFieldTypes()
+ );
+ }
+
+ @Override
+ public TableSource<Row> projectFields(int[] fields) {
+ return new ParquetTableSource(path, parquetSchema, parquetConfig, recursiveEnumeration, fields, null);
+ }
+
+ @Override
+ public DataSet<Row> getDataSet(ExecutionEnvironment executionEnvironment) {
+ ParquetRowInputFormat parquetRowInputFormat = new ParquetRowInputFormat(new Path(path), parquetSchema);
+ parquetRowInputFormat.setNestedFileEnumeration(recursiveEnumeration);
+ if (selectedFields != null) {
+ parquetRowInputFormat.selectFields(typeInfo.getFieldNames());
+ }
+
+ if (predicate != null) {
+ parquetRowInputFormat.setFilterPredicate(predicate);
+ }
+
+ return executionEnvironment.createInput(parquetRowInputFormat).name(explainSource());
+ }
+
+ @Override
+ public TableSource<Row> applyPredicate(List<Expression> predicates) {
+
+ // try to convert Flink filter expressions to Parquet FilterPredicates
+ List<FilterPredicate> convertedPredicates = new ArrayList<>(predicates.size());
+ List<Expression> unsupportedExpressions = new ArrayList<>(predicates.size());
+
+ for (Expression toConvert : predicates) {
+ FilterPredicate convertedPredicate = toParquetPredicate(toConvert);
+ if (convertedPredicate != null) {
+ convertedPredicates.add(convertedPredicate);
+ } else {
+ unsupportedExpressions.add(toConvert);
+ }
+ }
+
+ // update list of Flink expressions to unsupported expressions
+ predicates.clear();
+ predicates.addAll(unsupportedExpressions);
+
+ // construct single Parquet FilterPredicate
+ FilterPredicate parquetPredicate = null;
+ if (!convertedPredicates.isEmpty()) {
+ // concat converted predicates with AND
+ parquetPredicate = convertedPredicates.get(0);
+
+ for (FilterPredicate converted : convertedPredicates.subList(1, convertedPredicates.size())) {
+ parquetPredicate = FilterApi.and(parquetPredicate, converted);
+ }
+ }
+
+ // create and return a new ParquetTableSource with Parquet FilterPredicate
+ return new ParquetTableSource(path, parquetSchema, this.parquetConfig, recursiveEnumeration, selectedFields, parquetPredicate);
+ }
+
+ @Override
+ public boolean isFilterPushedDown() {
+ return isFilterPushedDown;
+ }
+
+ @Override
+ public TypeInformation<Row> getReturnType() {
+ return typeInfo;
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ @Override
+ public String explainSource() {
+ return "ParquetFile[path=" + path + ", schema=" + parquetSchema + ", filter=" + predicateString()
+ + ", typeInfo=" + typeInfo + "]";
+ }
+
+ private String predicateString() {
+ if (predicate != null) {
+ return predicate.toString();
+ } else {
+ return "TRUE";
+ }
+ }
+
+ /**
+ * Converts Flink Expression to Parquet FilterPredicate.
+ */
+ @Nullable
+ private FilterPredicate toParquetPredicate(Expression exp) {
+ if (exp instanceof Not) {
+ FilterPredicate c = toParquetPredicate(((Not) exp).child());
+ if (c == null) {
+ return null;
+ } else {
+ return FilterApi.not(c);
+ }
+ } else if (exp instanceof BinaryComparison) {
+ BinaryComparison binComp = (BinaryComparison) exp;
+
+ if (!isValid(binComp)) {
+ // unsupported literal Type
+ LOG.debug("Unsupported predict [{}] cannot be pushed to ParquetTableSource.", exp);
+ return null;
+ }
+
+ boolean onRight = literalOnRight(binComp);
+ Tuple2<Column, Comparable> columnPair = extractColumnAndLiteral(binComp);
+
+ if (columnPair != null) {
+ if (exp instanceof EqualTo) {
+ if (columnPair.f0 instanceof IntColumn) {
+ return FilterApi.eq((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+ } else if (columnPair.f0 instanceof LongColumn) {
+ return FilterApi.eq((LongColumn) columnPair.f0, (Long) columnPair.f1);
+ } else if (columnPair.f0 instanceof DoubleColumn) {
+ return FilterApi.eq((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+ } else if (columnPair.f0 instanceof FloatColumn) {
+ return FilterApi.eq((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+ } else if (columnPair.f0 instanceof BooleanColumn) {
+ return FilterApi.eq((BooleanColumn) columnPair.f0, (Boolean) columnPair.f1);
+ } else if (columnPair.f0 instanceof BinaryColumn) {
+ return FilterApi.eq((BinaryColumn) columnPair.f0, (Binary) columnPair.f1);
+ }
+ } else if (exp instanceof NotEqualTo) {
+ if (columnPair.f0 instanceof IntColumn) {
+ return FilterApi.notEq((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+ } else if (columnPair.f0 instanceof LongColumn) {
+ return FilterApi.notEq((LongColumn) columnPair.f0, (Long) columnPair.f1);
+ } else if (columnPair.f0 instanceof DoubleColumn) {
+ return FilterApi.notEq((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+ } else if (columnPair.f0 instanceof FloatColumn) {
+ return FilterApi.notEq((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+ } else if (columnPair.f0 instanceof BooleanColumn) {
+ return FilterApi.notEq((BooleanColumn) columnPair.f0, (Boolean) columnPair.f1);
+ } else if (columnPair.f0 instanceof BinaryColumn) {
+ return FilterApi.notEq((BinaryColumn) columnPair.f0, (Binary) columnPair.f1);
+ }
+ } else if (exp instanceof GreaterThan) {
+ if (onRight) {
+ return greaterThan(exp, columnPair);
+ } else {
+ lessThan(exp, columnPair);
+ }
+ } else if (exp instanceof GreaterThanOrEqual) {
+ if (onRight) {
+ return greaterThanOrEqual(exp, columnPair);
+ } else {
+ return lessThanOrEqual(exp, columnPair);
+ }
+ } else if (exp instanceof LessThan) {
+ if (onRight) {
+ return lessThan(exp, columnPair);
+ } else {
+ return greaterThan(exp, columnPair);
+ }
+ } else if (exp instanceof LessThanOrEqual) {
+ if (onRight) {
+ return lessThanOrEqual(exp, columnPair);
+ } else {
+ return greaterThanOrEqual(exp, columnPair);
+ }
+ } else {
+ // Unsupported Predicate
+ LOG.debug("Unsupported predicate [{}] cannot be pushed into ParquetTableSource.", exp);
+ return null;
+ }
+ }
+ } else if (exp instanceof BinaryExpression) {
+ if (exp instanceof And) {
+ LOG.debug("All of the predicates should be in CNF. Found an AND expression.", exp);
+ } else if (exp instanceof Or) {
+ FilterPredicate c1 = toParquetPredicate(((Or) exp).left());
+ FilterPredicate c2 = toParquetPredicate(((Or) exp).right());
+
+ if (c1 == null || c2 == null) {
+ return null;
+ } else {
+ return FilterApi.or(c1, c2);
+ }
+ } else {
+ // Unsupported Predicate
+ LOG.debug("Unsupported predicate [{}] cannot be pushed into ParquetTableSource.", exp);
+ return null;
+ }
+ }
+
+ return null;
+ }
+
+ @Nullable
+ private FilterPredicate greaterThan(Expression exp, Tuple2<Column, Comparable> columnPair) {
+ Preconditions.checkArgument(exp instanceof GreaterThan, "exp has to be GreaterThan");
+ if (columnPair.f0 instanceof IntColumn) {
+ return FilterApi.gt((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+ } else if (columnPair.f0 instanceof LongColumn) {
+ return FilterApi.gt((LongColumn) columnPair.f0, (Long) columnPair.f1);
+ } else if (columnPair.f0 instanceof DoubleColumn) {
+ return FilterApi.gt((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+ } else if (columnPair.f0 instanceof FloatColumn) {
+ return FilterApi.gt((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+ }
+
+ return null;
+ }
+
+ @Nullable
+ private FilterPredicate lessThan(Expression exp, Tuple2<Column, Comparable> columnPair) {
+ Preconditions.checkArgument(exp instanceof LessThan, "exp has to be LessThan");
+
+ if (columnPair.f0 instanceof IntColumn) {
+ return FilterApi.lt((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+ } else if (columnPair.f0 instanceof LongColumn) {
+ return FilterApi.lt((LongColumn) columnPair.f0, (Long) columnPair.f1);
+ } else if (columnPair.f0 instanceof DoubleColumn) {
+ return FilterApi.lt((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+ } else if (columnPair.f0 instanceof FloatColumn) {
+ return FilterApi.lt((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+ }
+
+ return null;
+ }
+
+ @Nullable
+ private FilterPredicate greaterThanOrEqual(Expression exp, Tuple2<Column, Comparable> columnPair) {
+ Preconditions.checkArgument(exp instanceof GreaterThanOrEqual, "exp has to be GreaterThanOrEqual");
+ if (columnPair.f0 instanceof IntColumn) {
+ return FilterApi.gtEq((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+ } else if (columnPair.f0 instanceof LongColumn) {
+ return FilterApi.gtEq((LongColumn) columnPair.f0, (Long) columnPair.f1);
+ } else if (columnPair.f0 instanceof DoubleColumn) {
+ return FilterApi.gtEq((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+ } else if (columnPair.f0 instanceof FloatColumn) {
+ return FilterApi.gtEq((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+ }
+
+ return null;
+ }
+
+ @Nullable
+ private FilterPredicate lessThanOrEqual(Expression exp, Tuple2<Column, Comparable> columnPair) {
+ Preconditions.checkArgument(exp instanceof LessThanOrEqual, "exp has to be LessThanOrEqual");
+ if (columnPair.f0 instanceof IntColumn) {
+ return FilterApi.ltEq((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+ } else if (columnPair.f0 instanceof LongColumn) {
+ return FilterApi.ltEq((LongColumn) columnPair.f0, (Long) columnPair.f1);
+ } else if (columnPair.f0 instanceof DoubleColumn) {
+ return FilterApi.ltEq((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+ } else if (columnPair.f0 instanceof FloatColumn) {
+ return FilterApi.ltEq((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+ }
+
+ return null;
+ }
+
+ private boolean isValid(BinaryComparison comp) {
+ return (comp.left() instanceof Literal && comp.right() instanceof Attribute) ||
+ (comp.left() instanceof Attribute && comp.right() instanceof Literal);
+ }
+
+ private boolean literalOnRight(BinaryComparison comp) {
+ if (comp.left() instanceof Literal && comp.right() instanceof Attribute) {
+ return false;
+ } else if (comp.left() instanceof Attribute && comp.right() instanceof Literal) {
+ return true;
+ } else {
+ throw new RuntimeException("Invalid binary comparison.");
+ }
+ }
+
+ private TypeInformation<?> getLiteralType(BinaryComparison comp) {
+ if (literalOnRight(comp)) {
+ return ((Literal) comp.right()).resultType();
+ } else {
+ return ((Literal) comp.left()).resultType();
+ }
+ }
+
+ private Object getLiteral(BinaryComparison comp) {
+ if (literalOnRight(comp)) {
+ return ((Literal) comp.right()).value();
+ } else {
+ return ((Literal) comp.left()).value();
+ }
+ }
+
+ private String getColumnName(BinaryComparison comp) {
+ if (literalOnRight(comp)) {
+ return ((Attribute) comp.left()).name();
+ } else {
+ return ((Attribute) comp.right()).name();
+ }
+ }
+
+ @Nullable
+ private Tuple2<Column, Comparable> extractColumnAndLiteral(BinaryComparison comp) {
+ TypeInformation<?> typeInfo = getLiteralType(comp);
+ String columnName = getColumnName(comp);
+
+ // fetch literal and ensure it is comparable
+ Object value = getLiteral(comp);
+ // validate that literal is comparable
+ if (!(value instanceof Comparable)) {
+ LOG.warn("Encountered a non-comparable literal of type {}." +
+ "Cannot push predicate [{}] into ParquetTablesource." +
+ "This is a bug and should be reported.", value.getClass().getCanonicalName(), comp);
+ return null;
+ }
+
+ if (typeInfo == BasicTypeInfo.BYTE_TYPE_INFO ||
+ typeInfo == BasicTypeInfo.SHORT_TYPE_INFO ||
+ typeInfo == BasicTypeInfo.INT_TYPE_INFO) {
+ return new Tuple2<>(FilterApi.intColumn(columnName), (Integer) value);
+ } else if (typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+ return new Tuple2<>(FilterApi.longColumn(columnName), (Long) value);
+ } else if (typeInfo == BasicTypeInfo.FLOAT_TYPE_INFO) {
+ return new Tuple2<>(FilterApi.floatColumn(columnName), (Float) value);
+ } else if (typeInfo == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+ return new Tuple2<>(FilterApi.booleanColumn(columnName), (Boolean) value);
+ } else if (typeInfo == BasicTypeInfo.DOUBLE_TYPE_INFO) {
+ return new Tuple2<>(FilterApi.doubleColumn(columnName), (Double) value);
+ } else if (typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
+ return new Tuple2<>(FilterApi.binaryColumn(columnName), Binary.fromString((String) value));
+ } else {
+ // unsupported type
+ return null;
+ }
+ }
+
+ // Builder
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Constructs an {@link ParquetTableSource}.
+ */
+ public static class Builder {
+
+ private String path;
+
+ private MessageType schema;
+
+ private Configuration config;
+
+ private boolean recursive = true;
+
+ /**
+ * Sets the path of Parquet files.
+ * If the path is specifies a directory, it will be recursively enumerated.
+ *
+ * @param path the path of the Parquet files.
+ * @return The Builder
+ */
+ public Builder path(String path) {
+ Preconditions.checkNotNull(path, "Path must not be null");
+ Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty");
+ this.path = path;
+ return this;
+ }
+
+ /**
+ * Sets the path of the Parquet files.
+ *
+ * @param path The path of the Parquet files
+ * @param recursive Flag whether to enumerate
+ * @return The Builder
+ */
+ public Builder path(String path, boolean recursive) {
+ Preconditions.checkNotNull(path, "Path must not be null");
+ Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty");
+ this.path = path;
+ this.recursive = recursive;
+ return this;
+ }
+
+ /**
+ * Sets the Parquet schema of the files to read as a String.
+ *
+ * @param parquetSchema The parquet schema of the files to read as a String.
+ * @return The Builder
+ */
+ public Builder forParquetSchema(MessageType parquetSchema) {
+ Preconditions.checkNotNull(parquetSchema, "Parquet schema must not be null");
+ this.schema = parquetSchema;
+ return this;
+ }
+
+ /**
+ * Sets a Hadoop {@link Configuration} for the Parquet Reader. If no configuration is configured,
+ * an empty configuration is used.
+ *
+ * @param config The Hadoop Configuration for the Parquet reader.
+ * @return The Builder
+ */
+ public Builder withConfiguration(Configuration config) {
+ Preconditions.checkNotNull(config, "Configuration must not be null.");
+ this.config = config;
+ return this;
+ }
+
+ /**
+ * Builds the ParquetTableSource for this builder.
+ *
+ * @return The ParquetTableSource for this builder.
+ */
+ public ParquetTableSource build() {
+ Preconditions.checkNotNull(path, "Path must not be null");
+ Preconditions.checkNotNull(schema, "Parquet schema must not be null");
+ if (config == null) {
+ this.config = new Configuration();
+ }
+
+ return new ParquetTableSource(this.path, this.schema, this.config, this.recursive);
+ }
+ }
+}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
index 35e1977..084c060 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
@@ -43,7 +43,7 @@ import java.util.List;
* Schema converter converts Parquet schema to and from Flink internal types.
*/
public class ParquetSchemaConverter {
- private static final Logger LOGGER = LoggerFactory.getLogger(RowConverter.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParquetSchemaConverter.class);
public static final String MAP_VALUE = "value";
public static final String LIST_ARRAY_TYPE = "array";
public static final String LIST_ELEMENT = "element";
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
index 2911166..f36b12c 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
@@ -77,7 +77,7 @@ public class ParquetMapInputFormatTest {
List<Map<String, String>> nestedArray = (List<Map<String, String>>) map.get("nestedArray");
assertEquals(1, nestedArray.size());
assertEquals("color", nestedArray.get(0).get("type"));
- assertEquals("yellow", nestedArray.get(0).get("value"));
+ assertEquals(1L, nestedArray.get(0).get("value"));
}
@Test
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
new file mode 100644
index 0000000..d8eba5e
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Integration tests for {@link ParquetTableSource}.
+ */
+public class ParquetTableSourceITCase extends MultipleProgramsTestBase {
+ private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();
+ private static Path testPath;
+
+ @ClassRule
+ public static TemporaryFolder tempRoot = new TemporaryFolder();
+
+ public ParquetTableSourceITCase() {
+ super(TestExecutionMode.COLLECTION);
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ testPath = createTestParquetFile(1000);
+ }
+
+ @Test
+ public void testFullScan() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
+ ParquetTableSource tableSource = createParquetTableSource(testPath);
+ batchTableEnvironment.registerTableSource("ParquetTable", tableSource);
+ String query =
+ "SELECT foo " +
+ "FROM ParquetTable";
+
+ Table table = batchTableEnvironment.sqlQuery(query);
+ DataSet<Row> dataSet = batchTableEnvironment.toDataSet(table, Row.class);
+ List<Row> result = dataSet.collect();
+
+ assertEquals(1000, result.size());
+ }
+
+ @Test
+ public void testScanWithProjectionAndFilter() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
+ ParquetTableSource tableSource = createParquetTableSource(testPath);
+ batchTableEnvironment.registerTableSource("ParquetTable", tableSource);
+ String query =
+ "SELECT foo " +
+ "FROM ParquetTable WHERE bar.spam >= 30 AND CARDINALITY(arr) >= 1 AND arr[1] <= 50";
+
+ Table table = batchTableEnvironment.sqlQuery(query);
+ DataSet<Row> dataSet = batchTableEnvironment.toDataSet(table, Row.class);
+ List<Row> result = dataSet.collect();
+
+ assertEquals(21, result.size());
+ }
+
+ /**
+ * Create test Parquet table source that reads a test file created by {@link #createTestParquetFile(int)}.
+ */
+ private ParquetTableSource createParquetTableSource(Path path) throws IOException {
+ MessageType nestedSchema = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+ ParquetTableSource parquetTableSource = ParquetTableSource.builder()
+ .path(path.getPath())
+ .forParquetSchema(nestedSchema)
+ .build();
+ return parquetTableSource;
+ }
+
+ /**
+ * Create a test Parquet file with a given number of rows.
+ */
+ private static Path createTestParquetFile(int numberOfRows) throws Exception {
+ List<IndexedRecord> records = TestUtil.createRecordList(numberOfRows);
+ Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), TestUtil.NESTED_SCHEMA, records);
+ return path;
+ }
+}
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
new file mode 100644
index 0000000..1e6ebf8
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GetCompositeField;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.ItemAt;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.PlannerResolvedFieldReference;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.schema.MessageType;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link ParquetTableSource}.
+ */
+public class ParquetTableSourceTest extends TestUtil {
+ private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();
+ private static Path testPath;
+
+ @ClassRule
+ public static TemporaryFolder tempRoot = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ testPath = createTestParquetFile();
+ }
+
+ @Test
+ public void testGetReturnType() {
+ MessageType nestedSchema = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+ ParquetTableSource parquetTableSource = ParquetTableSource.builder()
+ .path("dummy-path")
+ .forParquetSchema(nestedSchema)
+ .build();
+
+ TypeInformation<Row> returnType = parquetTableSource.getReturnType();
+ assertNotNull(returnType);
+ assertTrue(returnType instanceof RowTypeInfo);
+ RowTypeInfo rowType = (RowTypeInfo) returnType;
+ assertEquals(NESTED_ROW_TYPE, rowType);
+ }
+
+ @Test
+ public void testGetTableSchema() {
+ MessageType nestedSchema = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+ ParquetTableSource parquetTableSource = ParquetTableSource.builder()
+ .path("dummy-path")
+ .forParquetSchema(nestedSchema)
+ .build();
+
+ TableSchema schema = parquetTableSource.getTableSchema();
+ assertNotNull(schema);
+
+ RowTypeInfo expectedSchema = (RowTypeInfo) NESTED_ROW_TYPE;
+ assertArrayEquals(expectedSchema.getFieldNames(), schema.getFieldNames());
+ assertArrayEquals(expectedSchema.getFieldTypes(), schema.getFieldTypes());
+ }
+
+ @Test
+ public void testFieldsProjection() throws Exception {
+ ParquetTableSource parquetTableSource = createNestedTestParquetTableSource(testPath);
+ ParquetTableSource projected = (ParquetTableSource) parquetTableSource.projectFields(new int[] {2, 4, 6});
+
+ // ensure a new reference is returned
+ assertNotSame(projected, parquetTableSource);
+
+ // ensure table schema is the same
+ assertEquals(parquetTableSource.getTableSchema(), projected.getTableSchema());
+
+ // ensure that table source description differs
+ assertNotEquals(parquetTableSource.explainSource(), projected.explainSource());
+
+ String[] fieldNames = ((RowTypeInfo) NESTED_ROW_TYPE).getFieldNames();
+ TypeInformation[] fieldTypes = ((RowTypeInfo) NESTED_ROW_TYPE).getFieldTypes();
+ assertEquals(
+ Types.ROW_NAMED(
+ new String[] {fieldNames[2], fieldNames[4], fieldNames[6]},
+ fieldTypes[2], fieldTypes[4], fieldTypes[6]
+ ),
+ projected.getReturnType()
+ );
+
+ // ensure ParquetInputFormat is configured with selected fields
+ DataSet<Row> data = projected.getDataSet(ExecutionEnvironment.createLocalEnvironment());
+ InputFormat<Row, ?> inputFormat = ((DataSource<Row>) data).getInputFormat();
+ assertTrue(inputFormat instanceof ParquetRowInputFormat);
+ ParquetRowInputFormat parquetIF = (ParquetRowInputFormat) inputFormat;
+ assertArrayEquals(new String[]{fieldNames[2], fieldNames[4], fieldNames[6]}, parquetIF.getFieldNames());
+ assertArrayEquals(new TypeInformation<?>[]{fieldTypes[2], fieldTypes[4], fieldTypes[6]}, parquetIF.getFieldTypes());
+ }
+
+ @Test
+ public void testFieldsFilter() throws Exception {
+ ParquetTableSource parquetTableSource = createNestedTestParquetTableSource(testPath);
+
+ // expressions for supported predicates
+ Expression exp1 = new GreaterThan(
+ new PlannerResolvedFieldReference("foo", Types.LONG),
+ new Literal(100L, Types.LONG));
+ Expression exp2 = new EqualTo(
+ new Literal(100L, Types.LONG),
+ new PlannerResolvedFieldReference("bar.spam", Types.LONG));
+
+ // unsupported predicate
+ Expression unsupported = new EqualTo(
+ new GetCompositeField(
+ new ItemAt(
+ new PlannerResolvedFieldReference(
+ "nestedArray",
+ ObjectArrayTypeInfo.getInfoFor(
+ Types.ROW_NAMED(new String[] {"type", "name"}, Types.STRING, Types.STRING))),
+ new Literal(1, Types.INT)),
+ "type"),
+ new Literal("test", Types.STRING));
+ // invalid predicate
+ Expression invalidPred = new EqualTo(
+ new PlannerResolvedFieldReference("nonField", Types.LONG),
+ // some invalid, non-serializable, literal (here an object of this test class)
+ new Literal(new ParquetTableSourceTest(), Types.LONG)
+ );
+
+ List<Expression> exps = new ArrayList<>();
+ exps.add(exp1);
+ exps.add(exp2);
+ exps.add(unsupported);
+ exps.add(invalidPred);
+
+ // apply predict on TableSource
+ ParquetTableSource filtered = (ParquetTableSource) parquetTableSource.applyPredicate(exps);
+
+ // ensure copy is returned
+ assertNotSame(parquetTableSource, filtered);
+
+ // ensure table schema is identical
+ assertEquals(parquetTableSource.getTableSchema(), filtered.getTableSchema());
+
+ // ensure return type is identical
+ assertEquals(NESTED_ROW_TYPE, filtered.getReturnType());
+
+ // ensure source description is not the same
+ assertNotEquals(parquetTableSource.explainSource(), filtered.explainSource());
+
+ // check that pushdown was recorded
+ assertTrue(filtered.isFilterPushedDown());
+ assertFalse(parquetTableSource.isFilterPushedDown());
+
+ // ensure that supported predicates were removed from list of offered expressions
+ assertEquals(2, exps.size());
+ assertTrue(exps.contains(unsupported));
+ assertTrue(exps.contains(invalidPred));
+
+ // ensure ParquetInputFormat is correctly configured with filter
+ DataSet<Row> data = filtered.getDataSet(ExecutionEnvironment.createLocalEnvironment());
+ InputFormat<Row, ?> inputFormat = ((DataSource<Row>) data).getInputFormat();
+ assertTrue(inputFormat instanceof ParquetRowInputFormat);
+ ParquetRowInputFormat parquetIF = (ParquetRowInputFormat) inputFormat;
+
+ // expected predicate
+ FilterPredicate a = FilterApi.gt(FilterApi.longColumn("foo"), 100L);
+ FilterPredicate b = FilterApi.eq(FilterApi.longColumn("bar.spam"), 100L);
+ FilterPredicate expected = FilterApi.and(a, b);
+ // actual predicate
+ FilterPredicate predicate = parquetIF.getPredicate();
+ // check predicate
+ assertEquals(expected, predicate);
+ }
+
+ private static Path createTestParquetFile() throws Exception {
+ Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> nested = getNestedRecordTestData();
+ Path path = createTempParquetFile(tempRoot.getRoot(), NESTED_SCHEMA,
+ Collections.singletonList(nested.f1));
+ return path;
+ }
+
+ private ParquetTableSource createNestedTestParquetTableSource(Path path) throws Exception {
+ MessageType nestedSchema = SCHEMA_CONVERTER.convert(NESTED_SCHEMA);
+ ParquetTableSource parquetTableSource = ParquetTableSource.builder()
+ .path(path.getPath())
+ .forParquetSchema(nestedSchema)
+ .build();
+ return parquetTableSource;
+ }
+}
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
index d021fc3..6c79605 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
@@ -279,7 +279,7 @@ public class ParquetRecordReaderTest extends TestUtil {
Schema arrayItemSchema = nestedArraySchema.getElementType();
GenericRecord item = new GenericRecordBuilder(arrayItemSchema)
.set("type", "nested")
- .set("value", "nested_value").build();
+ .set("value", 1L).build();
ImmutableList.Builder<GenericRecord> list = ImmutableList.builder();
list.add(item);
@@ -310,7 +310,7 @@ public class ParquetRecordReaderTest extends TestUtil {
Row nestedRow = (Row) result[0];
assertEquals("nested", nestedRow.getField(0));
- assertEquals("nested_value", nestedRow.getField(1));
+ assertEquals(1L, nestedRow.getField(1));
}
private Schema unWrapSchema(Schema o) {
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
index ce13c8d..10db6d2 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
@@ -18,12 +18,7 @@
package org.apache.flink.formats.parquet.utils;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.types.Row;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
@@ -32,7 +27,6 @@ import org.apache.parquet.schema.Type;
import org.junit.Test;
import java.util.Arrays;
-import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -40,27 +34,6 @@ import static org.junit.Assert.assertEquals;
* Simple test case for conversion between Parquet schema and Flink date types.
*/
public class ParquetSchemaConverterTest extends TestUtil {
- private final TypeInformation<Row> simplyRowType = Types.ROW_NAMED(new String[] {"foo", "bar", "arr"},
- BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO);
-
- private final TypeInformation<Row[]> nestedArray = Types.OBJECT_ARRAY(Types.ROW_NAMED(new String[] {"type", "value"},
- BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
-
- @SuppressWarnings("unchecked")
- private final TypeInformation<Map<String, Row>> nestedMap = Types.MAP(BasicTypeInfo.STRING_TYPE_INFO,
- Types.ROW_NAMED(new String[] {"type", "value"},
- BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
-
- @SuppressWarnings("unchecked")
- private final TypeInformation<Row> nestedRowType = Types.ROW_NAMED(
- new String[] {"foo", "spamMap", "bar", "arr", "strArray", "nestedMap", "nestedArray"},
- BasicTypeInfo.LONG_TYPE_INFO,
- Types.MAP(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
- Types.ROW_NAMED(new String[] {"spam"}, BasicTypeInfo.LONG_TYPE_INFO),
- BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO,
- BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
- nestedMap,
- nestedArray);
private final Type[] simpleStandardTypes = {
org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL)
@@ -102,8 +75,8 @@ public class ParquetSchemaConverterTest extends TestUtil {
org.apache.parquet.schema.Types.optionalGroup().addField(org.apache.parquet.schema.Types.repeatedGroup()
.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED)
.as(OriginalType.UTF8).named("type"))
- .addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED)
- .as(OriginalType.UTF8).named("value"))
+ .addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REQUIRED)
+ .as(OriginalType.INT_64).named("value"))
.named("element")).as(OriginalType.LIST)
.named("nestedArray")
};
@@ -112,25 +85,25 @@ public class ParquetSchemaConverterTest extends TestUtil {
public void testSimpleSchemaConversion() {
MessageType simpleType = new MessageType("simple", simpleStandardTypes);
RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(simpleType);
- assertEquals(simplyRowType, rowTypeInfo);
+ assertEquals(SIMPLE_ROW_TYPE, rowTypeInfo);
}
@Test
public void testNestedSchemaConversion() {
MessageType nestedTypes = new MessageType("nested", this.nestedTypes);
RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(nestedTypes);
- assertEquals(nestedRowType, rowTypeInfo);
+ assertEquals(NESTED_ROW_TYPE, rowTypeInfo);
}
@Test
public void testSimpleRowTypeConversion() {
- MessageType simpleSchema = ParquetSchemaConverter.toParquetType(simplyRowType, true);
+ MessageType simpleSchema = ParquetSchemaConverter.toParquetType(SIMPLE_ROW_TYPE, true);
assertEquals(Arrays.asList(simpleStandardTypes), simpleSchema.getFields());
}
@Test
public void testNestedRowTypeConversion() {
- MessageType nestedSchema = ParquetSchemaConverter.toParquetType(nestedRowType, true);
+ MessageType nestedSchema = ParquetSchemaConverter.toParquetType(NESTED_ROW_TYPE, true);
assertEquals(Arrays.asList(nestedTypes), nestedSchema.getFields());
}
}
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
index ed64041..6b5cf2a 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
@@ -19,6 +19,10 @@
package org.apache.flink.formats.parquet.utils;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.generated.ArrayItem;
@@ -52,11 +56,33 @@ import java.util.UUID;
* Utilities for testing schema conversion and test parquet file creation.
*/
public class TestUtil {
+ private static final TypeInformation<Row[]> nestedArray = Types.OBJECT_ARRAY(Types.ROW_NAMED(
+ new String[] {"type", "value"}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO));
+
+ @SuppressWarnings("unchecked")
+ private static final TypeInformation<Map<String, Row>> nestedMap = Types.MAP(BasicTypeInfo.STRING_TYPE_INFO,
+ Types.ROW_NAMED(new String[] {"type", "value"},
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+
@ClassRule
public static TemporaryFolder tempRoot = new TemporaryFolder();
public static final Schema NESTED_SCHEMA = getTestSchema("nested.avsc");
public static final Schema SIMPLE_SCHEMA = getTestSchema("simple.avsc");
+ public static final TypeInformation<Row> SIMPLE_ROW_TYPE = Types.ROW_NAMED(new String[] {"foo", "bar", "arr"},
+ BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO);
+
+ @SuppressWarnings("unchecked")
+ public static final TypeInformation<Row> NESTED_ROW_TYPE = Types.ROW_NAMED(
+ new String[] {"foo", "spamMap", "bar", "arr", "strArray", "nestedMap", "nestedArray"},
+ BasicTypeInfo.LONG_TYPE_INFO,
+ Types.MAP(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+ Types.ROW_NAMED(new String[] {"spam"}, BasicTypeInfo.LONG_TYPE_INFO),
+ BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO,
+ BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
+ nestedMap,
+ nestedArray);
+
public static Path createTempParquetFile(File folder, Schema schema, List<IndexedRecord> records) throws IOException {
Path path = new Path(folder.getPath(), UUID.randomUUID().toString());
ParquetWriter<IndexedRecord> writer = AvroParquetWriter.<IndexedRecord>builder(
@@ -96,7 +122,7 @@ public class TestUtil {
final ArrayItem arrayItem = ArrayItem.newBuilder()
.setType("color")
- .setValue("yellow").build();
+ .setValue(1L).build();
final MapItem mapItem = MapItem.newBuilder()
.setType("map")
@@ -129,7 +155,7 @@ public class TestUtil {
final Row arrayItemRow = new Row(2);
arrayItemRow.setField(0, "color");
- arrayItemRow.setField(1, "yellow");
+ arrayItemRow.setField(1, 1L);
final Row mapItemRow = new Row(2);
mapItemRow.setField(0, "map");
@@ -154,6 +180,48 @@ public class TestUtil {
return t;
}
+ /**
+ * Create a list of NestedRecord with the NESTED_SCHEMA.
+ */
+ public static List<IndexedRecord> createRecordList(long numberOfRows) {
+ List<IndexedRecord> records = new ArrayList<>(0);
+ for (long i = 0; i < numberOfRows; i++) {
+ final Bar bar = Bar.newBuilder()
+ .setSpam(i).build();
+
+ final ArrayItem arrayItem = ArrayItem.newBuilder()
+ .setType("color")
+ .setValue(i).build();
+
+ final MapItem mapItem = MapItem.newBuilder()
+ .setType("map")
+ .setValue("hashMap").build();
+
+ List<ArrayItem> nestedArray = new ArrayList<>();
+ nestedArray.add(arrayItem);
+
+ Map<CharSequence, MapItem> nestedMap = new HashMap<>();
+ nestedMap.put("mapItem", mapItem);
+
+ List<Long> longArray = new ArrayList<>();
+ longArray.add(i);
+
+ List<CharSequence> stringArray = new ArrayList<>();
+ stringArray.add("String");
+
+ final NestedRecord nestedRecord = NestedRecord.newBuilder()
+ .setBar(bar)
+ .setNestedArray(nestedArray)
+ .setStrArray(stringArray)
+ .setNestedMap(nestedMap)
+ .setArr(longArray).build();
+
+ records.add(nestedRecord);
+ }
+
+ return records;
+ }
+
public static RuntimeContext getMockRuntimeContext() {
RuntimeContext mockContext = Mockito.mock(RuntimeContext.class);
Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup())
diff --git a/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc b/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc
index 2517c61..eb60752 100644
--- a/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc
+++ b/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc
@@ -26,7 +26,7 @@
"name": "ArrayItem",
"fields": [
{"name": "type", "type": "string"},
- {"name": "value", "type": "string"}]}
+ {"name": "value", "type": "long"}]}
}]
}
],