You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/10 10:39:52 UTC

[GitHub] [flink] lirui-apache commented on a change in pull request #13479: [FLINK-19414][parquet] Introduce ParquetColumnarRowInputFormat

lirui-apache commented on a change in pull request #13479:
URL: https://github.com/apache/flink/pull/13479#discussion_r502767584



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
##########
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
+import org.apache.flink.formats.parquet.vector.ColumnBatchFactory;
+import org.apache.flink.formats.parquet.vector.ParquetDecimalVector;
+import org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createColumnReader;
+import static org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
+/**
+ * Parquet {@link BulkFormat} that reads data from the file to {@link VectorizedColumnBatch} in
+ * vectorized mode.
+ */
+public abstract class ParquetVectorizedInputFormat<T> implements BulkFormat<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final SerializableConfiguration hadoopConfig;
+	private final String[] projectedFields;
+	private final LogicalType[] projectedTypes;
+	private final ColumnBatchFactory batchFactory;
+	private final int batchSize;
+	private final boolean isUtcTimestamp;
+	private final boolean isCaseSensitive;
+
+	public ParquetVectorizedInputFormat(
+			SerializableConfiguration hadoopConfig,
+			String[] projectedFields,
+			LogicalType[] projectedTypes,
+			ColumnBatchFactory batchFactory,
+			int batchSize,
+			boolean isUtcTimestamp,
+			boolean isCaseSensitive) {
+		Preconditions.checkArgument(
+				projectedFields.length == projectedTypes.length,
+				"The length(%s) of projectedFields should equal to the length(%s) projectedTypes",
+				projectedFields.length,
+				projectedTypes.length);
+
+		this.hadoopConfig = hadoopConfig;
+		this.projectedFields = projectedFields;
+		this.projectedTypes = projectedTypes;
+		this.batchFactory = batchFactory;
+		this.batchSize = batchSize;
+		this.isUtcTimestamp = isUtcTimestamp;
+		this.isCaseSensitive = isCaseSensitive;
+	}
+
+	@Override
+	public ParquetReader createReader(
+			Configuration config,
+			Path filePath,
+			long splitOffset,
+			long splitLength) throws IOException {
+		org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(filePath.toUri());
+		ParquetMetadata footer = readFooter(
+				hadoopConfig.conf(), hadoopPath, range(splitOffset, splitOffset + splitLength));
+		MessageType fileSchema = footer.getFileMetaData().getSchema();
+		FilterCompat.Filter filter = getFilter(hadoopConfig.conf());
+		List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
+
+		MessageType requestedSchema = clipParquetSchema(fileSchema);
+		ParquetFileReader reader = new ParquetFileReader(
+				hadoopConfig.conf(),
+				footer.getFileMetaData(),
+				hadoopPath,
+				blocks,
+				requestedSchema.getColumns());
+
+		long totalRowCount = 0;
+		for (BlockMetaData block : blocks) {
+			totalRowCount += block.getRowCount();
+		}
+
+		checkSchema(fileSchema, requestedSchema);
+
+		final int numBatchesToCirculate = config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY) + 1;
+		final Pool<ParquetReaderBatch<T>> poolOfBatches =
+				createPoolOfBatches(filePath, requestedSchema, numBatchesToCirculate);
+
+		return new ParquetReader(reader, requestedSchema, totalRowCount, poolOfBatches);
+	}
+
+	@Override
+	public ParquetReader restoreReader(
+			Configuration config,
+			Path filePath,
+			long splitOffset,
+			long splitLength,
+			CheckpointedPosition checkpointedPosition) throws IOException {
+		ParquetReader reader = createReader(config, filePath, splitOffset, splitLength);
+		// Offset is record count too.
+		reader.seek(checkpointedPosition.getOffset() + checkpointedPosition.getRecordsAfterOffset());

Review comment:
       -1 can be used to represent `NO_OFFSET`, will that cause a problem here?

##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
##########
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
+import org.apache.flink.formats.parquet.vector.ColumnBatchFactory;
+import org.apache.flink.formats.parquet.vector.ParquetDecimalVector;
+import org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createColumnReader;
+import static org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector;
+import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
+/**
+ * Parquet {@link BulkFormat} that reads data from the file to {@link VectorizedColumnBatch} in
+ * vectorized mode.
+ */
+public abstract class ParquetVectorizedInputFormat<T> implements BulkFormat<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final SerializableConfiguration hadoopConfig;
+	private final String[] projectedFields;
+	private final LogicalType[] projectedTypes;
+	private final ColumnBatchFactory batchFactory;
+	private final int batchSize;
+	private final boolean isUtcTimestamp;
+	private final boolean isCaseSensitive;
+
+	public ParquetVectorizedInputFormat(
+			SerializableConfiguration hadoopConfig,
+			String[] projectedFields,
+			LogicalType[] projectedTypes,
+			ColumnBatchFactory batchFactory,
+			int batchSize,
+			boolean isUtcTimestamp,
+			boolean isCaseSensitive) {
+		Preconditions.checkArgument(
+				projectedFields.length == projectedTypes.length,
+				"The length(%s) of projectedFields should equal to the length(%s) projectedTypes",
+				projectedFields.length,
+				projectedTypes.length);
+
+		this.hadoopConfig = hadoopConfig;
+		this.projectedFields = projectedFields;
+		this.projectedTypes = projectedTypes;
+		this.batchFactory = batchFactory;
+		this.batchSize = batchSize;
+		this.isUtcTimestamp = isUtcTimestamp;
+		this.isCaseSensitive = isCaseSensitive;
+	}
+
+	@Override
+	public ParquetReader createReader(
+			Configuration config,
+			Path filePath,
+			long splitOffset,
+			long splitLength) throws IOException {
+		org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(filePath.toUri());
+		ParquetMetadata footer = readFooter(
+				hadoopConfig.conf(), hadoopPath, range(splitOffset, splitOffset + splitLength));
+		MessageType fileSchema = footer.getFileMetaData().getSchema();
+		FilterCompat.Filter filter = getFilter(hadoopConfig.conf());
+		List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
+
+		MessageType requestedSchema = clipParquetSchema(fileSchema);
+		ParquetFileReader reader = new ParquetFileReader(
+				hadoopConfig.conf(),
+				footer.getFileMetaData(),
+				hadoopPath,
+				blocks,
+				requestedSchema.getColumns());
+
+		long totalRowCount = 0;
+		for (BlockMetaData block : blocks) {
+			totalRowCount += block.getRowCount();
+		}
+
+		checkSchema(fileSchema, requestedSchema);
+
+		final int numBatchesToCirculate = config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY) + 1;
+		final Pool<ParquetReaderBatch<T>> poolOfBatches =
+				createPoolOfBatches(filePath, requestedSchema, numBatchesToCirculate);
+
+		return new ParquetReader(reader, requestedSchema, totalRowCount, poolOfBatches);
+	}
+
+	@Override
+	public ParquetReader restoreReader(
+			Configuration config,
+			Path filePath,
+			long splitOffset,
+			long splitLength,
+			CheckpointedPosition checkpointedPosition) throws IOException {
+		ParquetReader reader = createReader(config, filePath, splitOffset, splitLength);
+		// Offset is record count too.
+		reader.seek(checkpointedPosition.getOffset() + checkpointedPosition.getRecordsAfterOffset());
+		return reader;
+	}
+
+	@Override
+	public boolean isSplittable() {
+		return true;
+	}
+
+	/**
+	 * Clips `parquetSchema` according to `fieldNames`.
+	 */
+	private MessageType clipParquetSchema(GroupType parquetSchema) {
+		Type[] types = new Type[projectedFields.length];
+		if (isCaseSensitive) {
+			for (int i = 0; i < projectedFields.length; ++i) {
+				String fieldName = projectedFields[i];
+				if (parquetSchema.getFieldIndex(fieldName) < 0) {
+					throw new IllegalArgumentException(fieldName + " does not exist");
+				}
+				types[i] = parquetSchema.getType(fieldName);
+			}
+		} else {
+			Map<String, Type> caseInsensitiveFieldMap = new HashMap<>();
+			for (Type type : parquetSchema.getFields()) {
+				caseInsensitiveFieldMap.compute(type.getName().toLowerCase(Locale.ROOT),
+						(key, previousType) -> {
+							if (previousType != null) {
+								throw new FlinkRuntimeException(
+										"Parquet with case insensitive mode should have no duplicate key: " + key);
+							}
+							return type;
+						});
+			}
+			for (int i = 0; i < projectedFields.length; ++i) {
+				Type type = caseInsensitiveFieldMap.get(projectedFields[i].toLowerCase(Locale.ROOT));
+				if (type == null) {
+					throw new IllegalArgumentException(projectedFields[i] + " does not exist");
+				}
+				// TODO clip for array,map,row types.
+				types[i] = type;
+			}
+		}
+
+		return Types.buildMessage().addFields(types).named("flink-parquet");
+	}
+
+	private void checkSchema(
+			MessageType fileSchema,
+			MessageType requestedSchema) throws IOException, UnsupportedOperationException {
+		if (projectedFields.length != requestedSchema.getFieldCount()) {
+			throw new RuntimeException("The quality of field type is incompatible with the request schema!");
+		}
+
+		/*
+		 * Check that the requested schema is supported.
+		 */
+		for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
+			Type t = requestedSchema.getFields().get(i);
+			if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
+				throw new UnsupportedOperationException("Complex types not supported.");
+			}
+
+			String[] colPath = requestedSchema.getPaths().get(i);
+			if (fileSchema.containsPath(colPath)) {
+				ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
+				if (!fd.equals(requestedSchema.getColumns().get(i))) {
+					throw new UnsupportedOperationException("Schema evolution not supported.");
+				}
+			} else {
+				if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() == 0) {
+					// Column is missing in data but the required data is non-nullable. This file is invalid.
+					throw new IOException("Required column is missing in data file. Col: " + Arrays.toString(colPath));
+				}
+			}
+		}
+	}
+
+	private Pool<ParquetReaderBatch<T>> createPoolOfBatches(
+			Path filePath, MessageType requestedSchema, int numBatches) {
+		final Pool<ParquetReaderBatch<T>> pool = new Pool<>(numBatches);
+
+		for (int i = 0; i < numBatches; i++) {
+			pool.add(createReaderBatch(filePath, requestedSchema, pool.recycler()));
+		}
+
+		return pool;
+	}
+
+	private ParquetReaderBatch<T> createReaderBatch(
+			Path filePath,
+			MessageType requestedSchema,
+			Pool.Recycler<ParquetReaderBatch<T>> recycler) {
+		WritableColumnVector[] writableVectors = createWritableVectors(requestedSchema);
+		VectorizedColumnBatch columnarBatch =
+				batchFactory.create(filePath, createReadableVectors(writableVectors));
+		return createReaderBatch(writableVectors, columnarBatch, recycler);
+	}
+
+	private WritableColumnVector[] createWritableVectors(MessageType requestedSchema) {
+		WritableColumnVector[] columns = new WritableColumnVector[projectedTypes.length];
+		for (int i = 0; i < projectedTypes.length; i++) {
+			columns[i] = createWritableColumnVector(
+					batchSize,
+					projectedTypes[i],
+					requestedSchema.getColumns().get(i).getPrimitiveType());
+		}
+		return columns;
+	}
+
+	/**
+	 * Create readable vectors from writable vectors.
+	 * Especially for decimal, see {@link ParquetDecimalVector}.
+	 */
+	private ColumnVector[] createReadableVectors(WritableColumnVector[] writableVectors) {
+		ColumnVector[] vectors = new ColumnVector[writableVectors.length];
+		for (int i = 0; i < writableVectors.length; i++) {
+			vectors[i] = projectedTypes[i].getTypeRoot() == LogicalTypeRoot.DECIMAL ?
+					new ParquetDecimalVector(writableVectors[i]) :
+					writableVectors[i];
+		}
+		return vectors;
+	}
+
+	private class ParquetReader implements BulkFormat.Reader<T> {
+
+		private ParquetFileReader reader;
+
+		private final MessageType requestedSchema;
+
+		/**
+		 * The total number of rows this RecordReader will eventually read. The sum of the rows of all
+		 * the row groups.
+		 */
+		private final long totalRowCount;
+
+		private final Pool<ParquetReaderBatch<T>> pool;
+
+		/**
+		 * The number of rows that have been returned.
+		 */
+		private long rowsReturned;
+
+		/**
+		 * The number of rows that have been reading, including the current in flight row group.
+		 */
+		private long totalCountLoadedSoFar;
+
+		/**
+		 * For each request column, the reader to read this column. This is NULL if this column is
+		 * missing from the file, in which case we populate the attribute with NULL.
+		 */
+		@SuppressWarnings("rawtypes")
+		private ColumnReader[] columnReaders;
+
+		private long recordsToSkip;
+
+		private ParquetReader(
+				ParquetFileReader reader,
+				MessageType requestedSchema,
+				long totalRowCount,
+				Pool<ParquetReaderBatch<T>> pool) {
+			this.reader = reader;
+			this.requestedSchema = requestedSchema;
+			this.totalRowCount = totalRowCount;
+			this.pool = pool;
+			this.rowsReturned = 0;
+			this.totalCountLoadedSoFar = 0;
+			this.recordsToSkip = 0;
+		}
+
+		@Nullable
+		@Override
+		public RecordIterator<T> readBatch() throws IOException {
+			final ParquetReaderBatch<T> batch = getCachedEntry();
+
+			final long startingRowNum = rowsReturned;
+			if (!nextBatch(batch)) {
+				batch.recycle();
+				return null;
+			}
+
+			final RecordIterator<T> records = batch.convertAndGetIterator(startingRowNum);
+			if (recordsToSkip > 0) {

Review comment:
       Seems this check is unnecessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org