You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ti...@apache.org on 2014/09/05 20:33:08 UTC
[2/2] git commit: PARQUET-84: Avoid reading rowgroup metadata in
memory on the client side.
PARQUET-84: Avoid reading rowgroup metadata in memory on the client side.
This will improve reading big datasets with a large schema (thousands of columns)
Instead rowgroup metadata can be read in the tasks where each tasks reads only the metadata of the file it's reading
Author: julien <ju...@twitter.com>
Closes #45 from julienledem/skip_reading_row_groups and squashes the following commits:
ccdd08c [julien] fix parquet-hive
24a2050 [julien] Merge branch 'master' into skip_reading_row_groups
3d7e35a [julien] adress review feedback
5b6bd1b [julien] more tests
323d254 [julien] sdd unit tests
f599259 [julien] review feedback
fb11f02 [julien] fix backward compatibility check
2c20b46 [julien] cleanup readFooters methods
3da37d8 [julien] fix read summary
ab95a45 [julien] cleanup
4d16df3 [julien] implement task side metadata
9bb8059 [julien] first stab at integrating skipping row groups
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/5dafd127
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/5dafd127
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/5dafd127
Branch: refs/heads/master
Commit: 5dafd127f3de7c516ce9c1b7329087a01ab2fc57
Parents: 647b8a7
Author: julien <ju...@twitter.com>
Authored: Fri Sep 5 11:32:46 2014 -0700
Committer: Tianshuo Deng <td...@twitter.com>
Committed: Fri Sep 5 11:32:46 2014 -0700
----------------------------------------------------------------------
.../converter/ParquetMetadataConverter.java | 181 +++++-
.../java/parquet/hadoop/ParquetFileReader.java | 213 +++++--
.../java/parquet/hadoop/ParquetFileWriter.java | 31 +-
.../java/parquet/hadoop/ParquetInputFormat.java | 638 ++++++++++++-------
.../java/parquet/hadoop/ParquetInputSplit.java | 340 ++++------
.../parquet/hadoop/ParquetOutputCommitter.java | 1 -
.../parquet/hadoop/ParquetOutputFormat.java | 1 -
.../main/java/parquet/hadoop/ParquetReader.java | 6 +-
.../parquet/hadoop/ParquetRecordReader.java | 70 +-
.../main/java/parquet/hadoop/PrintFooter.java | 3 +-
.../mapred/DeprecatedParquetInputFormat.java | 30 +-
.../mapred/DeprecatedParquetOutputFormat.java | 17 +-
.../converter/TestParquetMetadataConverter.java | 128 +++-
.../hadoop/DeprecatedInputFormatTest.java | 2 +-
.../java/parquet/hadoop/TestInputFormat.java | 235 ++++++-
.../parquet/hadoop/TestParquetFileWriter.java | 14 +-
.../parquet/hadoop/TestParquetInputSplit.java | 45 --
.../hadoop/example/TestInputOutputFormat.java | 24 +-
.../read/ParquetRecordReaderWrapper.java | 51 +-
.../main/java/parquet/pig/ParquetLoader.java | 38 +-
.../main/java/parquet/pig/TupleReadSupport.java | 14 +-
pom.xml | 3 +-
22 files changed, 1361 insertions(+), 724 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
index 5bd6869..76834d5 100644
--- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
@@ -32,6 +32,7 @@ import java.util.Set;
import parquet.Log;
import parquet.common.schema.ColumnPath;
import parquet.format.ColumnChunk;
+import parquet.format.ColumnMetaData;
import parquet.format.ConvertedType;
import parquet.format.DataPageHeader;
import parquet.format.DictionaryPageHeader;
@@ -58,7 +59,7 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName;
import parquet.schema.Type.Repetition;
import parquet.schema.TypeVisitor;
import parquet.schema.Types;
-
+import static java.lang.Math.min;
import static parquet.format.Util.readFileMetaData;
import static parquet.format.Util.writePageHeader;
@@ -340,8 +341,124 @@ public class ParquetMetadataConverter {
fileMetaData.addToKey_value_metadata(keyValue);
}
+ private static interface MetadataFilterVisitor<T, E extends Throwable> {
+ T visit(NoFilter filter) throws E;
+ T visit(SkipMetadataFilter filter) throws E;
+ T visit(RangeMetadataFilter filter) throws E;
+ }
+
+ public abstract static class MetadataFilter {
+ private MetadataFilter() {}
+ abstract <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E;
+ }
+ public static final MetadataFilter NO_FILTER = new NoFilter();
+ public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
+ /**
+ * [ startOffset, endOffset )
+ * @param startOffset
+ * @param endOffset
+ * @return the filter
+ */
+ public static final MetadataFilter range(long startOffset, long endOffset) {
+ return new RangeMetadataFilter(startOffset, endOffset);
+ }
+ private static final class NoFilter extends MetadataFilter {
+ private NoFilter() {}
+ @Override
+ <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+ return visitor.visit(this);
+ }
+ @Override
+ public String toString() {
+ return "NO_FILTER";
+ }
+ }
+ private static final class SkipMetadataFilter extends MetadataFilter {
+ private SkipMetadataFilter() {}
+ @Override
+ <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+ return visitor.visit(this);
+ }
+ @Override
+ public String toString() {
+ return "SKIP_ROW_GROUPS";
+ }
+ }
+ /**
+ * [ startOffset, endOffset )
+ * @author Julien Le Dem
+ */
+ static final class RangeMetadataFilter extends MetadataFilter {
+ final long startOffset;
+ final long endOffset;
+ RangeMetadataFilter(long startOffset, long endOffset) {
+ super();
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+ @Override
+ <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+ return visitor.visit(this);
+ }
+ boolean contains(long offset) {
+ return offset >= this.startOffset && offset < this.endOffset;
+ }
+ @Override
+ public String toString() {
+ return "range(s:" + startOffset + ", e:" + endOffset + ")";
+ }
+ }
+
+ @Deprecated
public ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
- FileMetaData fileMetaData = readFileMetaData(from);
+ return readParquetMetadata(from, NO_FILTER);
+ }
+
+ static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
+ List<RowGroup> rowGroups = metaData.getRow_groups();
+ List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
+ for (RowGroup rowGroup : rowGroups) {
+ long totalSize = 0;
+ long startIndex = getOffset(rowGroup.getColumns().get(0));
+ for (ColumnChunk col : rowGroup.getColumns()) {
+ totalSize += col.getMeta_data().getTotal_compressed_size();
+ }
+ long midPoint = startIndex + totalSize / 2;
+ if (filter.contains(midPoint)) {
+ newRowGroups.add(rowGroup);
+ }
+ }
+ metaData.setRow_groups(newRowGroups);
+ return metaData;
+ }
+
+ static long getOffset(RowGroup rowGroup) {
+ return getOffset(rowGroup.getColumns().get(0));
+ }
+ static long getOffset(ColumnChunk columnChunk) {
+ ColumnMetaData md = columnChunk.getMeta_data();
+ long offset = md.getData_page_offset();
+ if (md.isSetDictionary_page_offset() && offset > md.getDictionary_page_offset()) {
+ offset = md.getDictionary_page_offset();
+ }
+ return offset;
+ }
+
+ public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
+ FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
+ @Override
+ public FileMetaData visit(NoFilter filter) throws IOException {
+ return readFileMetaData(from);
+ }
+ @Override
+ public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
+ return readFileMetaData(from, true);
+ }
+ @Override
+ public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
+ return filterFileMetaData(readFileMetaData(from), filter);
+ }
+ });
if (Log.DEBUG) LOG.debug(fileMetaData);
ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
if (Log.DEBUG) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
@@ -352,37 +469,39 @@ public class ParquetMetadataConverter {
MessageType messageType = fromParquetSchema(parquetMetadata.getSchema());
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
List<RowGroup> row_groups = parquetMetadata.getRow_groups();
- for (RowGroup rowGroup : row_groups) {
- BlockMetaData blockMetaData = new BlockMetaData();
- blockMetaData.setRowCount(rowGroup.getNum_rows());
- blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
- List<ColumnChunk> columns = rowGroup.getColumns();
- String filePath = columns.get(0).getFile_path();
- for (ColumnChunk columnChunk : columns) {
- if ((filePath == null && columnChunk.getFile_path() != null)
- || (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
- throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
+ if (row_groups != null) {
+ for (RowGroup rowGroup : row_groups) {
+ BlockMetaData blockMetaData = new BlockMetaData();
+ blockMetaData.setRowCount(rowGroup.getNum_rows());
+ blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
+ List<ColumnChunk> columns = rowGroup.getColumns();
+ String filePath = columns.get(0).getFile_path();
+ for (ColumnChunk columnChunk : columns) {
+ if ((filePath == null && columnChunk.getFile_path() != null)
+ || (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
+ throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
+ }
+ parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
+ ColumnPath path = getPath(metaData);
+ ColumnChunkMetaData column = ColumnChunkMetaData.get(
+ path,
+ messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
+ CompressionCodecName.fromParquet(metaData.codec),
+ fromFormatEncodings(metaData.encodings),
+ fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
+ metaData.data_page_offset,
+ metaData.dictionary_page_offset,
+ metaData.num_values,
+ metaData.total_compressed_size,
+ metaData.total_uncompressed_size);
+ // TODO
+ // index_page_offset
+ // key_value_metadata
+ blockMetaData.addColumn(column);
}
- parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
- ColumnPath path = getPath(metaData);
- ColumnChunkMetaData column = ColumnChunkMetaData.get(
- path,
- messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
- CompressionCodecName.fromParquet(metaData.codec),
- fromFormatEncodings(metaData.encodings),
- fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
- metaData.data_page_offset,
- metaData.dictionary_page_offset,
- metaData.num_values,
- metaData.total_compressed_size,
- metaData.total_uncompressed_size);
- // TODO
- // index_page_offset
- // key_value_metadata
- blockMetaData.addColumn(column);
+ blockMetaData.setPath(filePath);
+ blocks.add(blockMetaData);
}
- blockMetaData.setPath(filePath);
- blocks.add(blockMetaData);
}
Map<String, String> keyValueMetaData = new HashMap<String, String>();
List<KeyValue> key_value_metadata = parquetMetadata.getKey_value_metadata();
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
index e660c9f..49f1fab 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
@@ -15,13 +15,21 @@
*/
package parquet.hadoop;
+import static parquet.Log.DEBUG;
+import static parquet.bytes.BytesUtils.readIntLittleEndian;
+import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
+import static parquet.hadoop.ParquetFileWriter.*;
+
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -39,6 +47,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.Utils;
import parquet.Log;
@@ -51,6 +60,7 @@ import parquet.common.schema.ColumnPath;
import parquet.format.PageHeader;
import parquet.format.Util;
import parquet.format.converter.ParquetMetadataConverter;
+import parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
import parquet.hadoop.CodecFactory.BytesDecompressor;
import parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
import parquet.hadoop.metadata.BlockMetaData;
@@ -59,11 +69,6 @@ import parquet.hadoop.metadata.ParquetMetadata;
import parquet.hadoop.util.counters.BenchmarkCounter;
import parquet.io.ParquetDecodingException;
-import static parquet.Log.DEBUG;
-import static parquet.bytes.BytesUtils.readIntLittleEndian;
-import static parquet.hadoop.ParquetFileWriter.MAGIC;
-import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
-
/**
* Internal implementation of the Parquet file reader as a block container
*
@@ -84,7 +89,28 @@ public class ParquetFileReader implements Closeable {
* @return the footers for those files using the summary file if possible.
* @throws IOException
*/
- public static List<Footer> readAllFootersInParallelUsingSummaryFiles(final Configuration configuration, List<FileStatus> partFiles) throws IOException {
+ @Deprecated
+ public static List<Footer> readAllFootersInParallelUsingSummaryFiles(Configuration configuration, List<FileStatus> partFiles) throws IOException {
+ return readAllFootersInParallelUsingSummaryFiles(configuration, partFiles, false);
+ }
+
+ private static MetadataFilter filter(boolean skipRowGroups) {
+ return skipRowGroups ? SKIP_ROW_GROUPS : NO_FILTER;
+ }
+
+ /**
+ * for files provided, check if there's a summary file.
+ * If a summary file is found it is used otherwise the file footer is used.
+ * @param configuration the hadoop conf to connect to the file system;
+ * @param partFiles the part files to read
+ * @param skipRowGroups to skipRowGroups in the footers
+ * @return the footers for those files using the summary file if possible.
+ * @throws IOException
+ */
+ public static List<Footer> readAllFootersInParallelUsingSummaryFiles(
+ final Configuration configuration,
+ final Collection<FileStatus> partFiles,
+ final boolean skipRowGroups) throws IOException {
// figure out list of all parents to part files
Set<Path> parents = new HashSet<Path>();
@@ -98,12 +124,17 @@ public class ParquetFileReader implements Closeable {
summaries.add(new Callable<Map<Path, Footer>>() {
@Override
public Map<Path, Footer> call() throws Exception {
- // fileSystem is thread-safe
- FileSystem fileSystem = path.getFileSystem(configuration);
- Path summaryFile = new Path(path, PARQUET_METADATA_FILE);
- if (fileSystem.exists(summaryFile)) {
- if (Log.INFO) LOG.info("reading summary file: " + summaryFile);
- final List<Footer> footers = readSummaryFile(configuration, fileSystem.getFileStatus(summaryFile));
+ ParquetMetadata mergedMetadata = readSummaryMetadata(configuration, path, skipRowGroups);
+ if (mergedMetadata != null) {
+ final List<Footer> footers;
+ if (skipRowGroups) {
+ footers = new ArrayList<Footer>();
+ for (FileStatus f : partFiles) {
+ footers.add(new Footer(f.getPath(), mergedMetadata));
+ }
+ } else {
+ footers = footersFromSummaryFile(path, mergedMetadata);
+ }
Map<Path, Footer> map = new HashMap<Path, Footer>();
for (Footer footer : footers) {
// the folder may have been moved
@@ -143,7 +174,7 @@ public class ParquetFileReader implements Closeable {
if (toRead.size() > 0) {
// read the footers of the files that did not have a summary file
if (Log.INFO) LOG.info("reading another " + toRead.size() + " footers");
- result.addAll(readAllFootersInParallel(configuration, toRead));
+ result.addAll(readAllFootersInParallel(configuration, toRead, skipRowGroups));
}
return result;
@@ -170,14 +201,28 @@ public class ParquetFileReader implements Closeable {
}
}
+ @Deprecated
public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles) throws IOException {
+ return readAllFootersInParallel(configuration, partFiles, false);
+ }
+
+ /**
+ * read all the footers of the files provided
+ * (not using summary files)
+ * @param configuration the conf to access the File System
+ * @param partFiles the files to read
+ * @param skipRowGroups to skip the rowGroup info
+ * @return the footers
+ * @throws IOException
+ */
+ public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles, final boolean skipRowGroups) throws IOException {
List<Callable<Footer>> footers = new ArrayList<Callable<Footer>>();
for (final FileStatus currentFile : partFiles) {
footers.add(new Callable<Footer>() {
@Override
public Footer call() throws Exception {
try {
- return new Footer(currentFile.getPath(), ParquetFileReader.readFooter(configuration, currentFile));
+ return new Footer(currentFile.getPath(), readFooter(configuration, currentFile, filter(skipRowGroups)));
} catch (IOException e) {
throw new IOException("Could not read footer for file " + currentFile, e);
}
@@ -191,38 +236,103 @@ public class ParquetFileReader implements Closeable {
}
}
+ /**
+ * Read the footers of all the files under that path (recursively)
+ * not using summary files.
+ * rowGroups are not skipped
+ * @param configuration the configuration to access the FS
+ * @param fileStatus the root dir
+ * @return all the footers
+ * @throws IOException
+ */
public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus) throws IOException {
- final FileSystem fs = fileStatus.getPath().getFileSystem(configuration);
- List<FileStatus> statuses;
- if (fileStatus.isDir()) {
- statuses = Arrays.asList(fs.listStatus(fileStatus.getPath(), new Utils.OutputFileUtils.OutputFilesFilter()));
- } else {
- statuses = new ArrayList<FileStatus>();
- statuses.add(fileStatus);
- }
- return readAllFootersInParallel(configuration, statuses);
+ List<FileStatus> statuses = listFiles(configuration, fileStatus);
+ return readAllFootersInParallel(configuration, statuses, false);
}
+ @Deprecated
+ public static List<Footer> readFooters(Configuration configuration, Path path) throws IOException {
+ return readFooters(configuration, status(configuration, path));
+ }
+
+ private static FileStatus status(Configuration configuration, Path path) throws IOException {
+ return path.getFileSystem(configuration).getFileStatus(path);
+ }
+
+ /**
+ * this always returns the row groups
+ * @param configuration
+ * @param pathStatus
+ * @return
+ * @throws IOException
+ */
+ @Deprecated
public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus) throws IOException {
- try {
- if (pathStatus.isDir()) {
- Path summaryPath = new Path(pathStatus.getPath(), PARQUET_METADATA_FILE);
- FileSystem fs = summaryPath.getFileSystem(configuration);
- if (fs.exists(summaryPath)) {
- FileStatus summaryStatus = fs.getFileStatus(summaryPath);
- return readSummaryFile(configuration, summaryStatus);
+ return readFooters(configuration, pathStatus, false);
+ }
+
+ /**
+ * Read the footers of all the files under that path (recursively)
+ * using summary files if possible
+ * @param configuration the configuration to access the FS
+ * @param fileStatus the root dir
+ * @return all the footers
+ * @throws IOException
+ */
+ public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus, boolean skipRowGroups) throws IOException {
+ List<FileStatus> files = listFiles(configuration, pathStatus);
+ return readAllFootersInParallelUsingSummaryFiles(configuration, files, skipRowGroups);
+ }
+
+ private static List<FileStatus> listFiles(Configuration conf, FileStatus fileStatus) throws IOException {
+ if (fileStatus.isDir()) {
+ FileSystem fs = fileStatus.getPath().getFileSystem(conf);
+ FileStatus[] list = fs.listStatus(fileStatus.getPath(), new PathFilter() {
+ @Override
+ public boolean accept(Path p) {
+ return !p.getName().startsWith("_") && !p.getName().startsWith(".");
}
+ });
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ for (FileStatus sub : list) {
+ result.addAll(listFiles(conf, sub));
}
- } catch (IOException e) {
- LOG.warn("can not read summary file for " + pathStatus.getPath(), e);
+ return result;
+ } else {
+ return Arrays.asList(fileStatus);
}
- return readAllFootersInParallel(configuration, pathStatus);
-
}
+ /**
+ * Specifically reads a given summary file
+ * @param configuration
+ * @param summaryStatus
+ * @return the metadata translated for each file
+ * @throws IOException
+ */
public static List<Footer> readSummaryFile(Configuration configuration, FileStatus summaryStatus) throws IOException {
final Path parent = summaryStatus.getPath().getParent();
- ParquetMetadata mergedFooters = readFooter(configuration, summaryStatus);
+ ParquetMetadata mergedFooters = readFooter(configuration, summaryStatus, filter(false));
+ return footersFromSummaryFile(parent, mergedFooters);
+ }
+
+ static ParquetMetadata readSummaryMetadata(Configuration configuration, Path basePath, boolean skipRowGroups) throws IOException {
+ Path metadataFile = new Path(basePath, PARQUET_METADATA_FILE);
+ Path commonMetaDataFile = new Path(basePath, PARQUET_COMMON_METADATA_FILE);
+ FileSystem fileSystem = basePath.getFileSystem(configuration);
+ if (skipRowGroups && fileSystem.exists(commonMetaDataFile)) {
+ // reading the summary file that does not contain the row groups
+ if (Log.INFO) LOG.info("reading summary file: " + commonMetaDataFile);
+ return readFooter(configuration, commonMetaDataFile, filter(skipRowGroups));
+ } else if (fileSystem.exists(metadataFile)) {
+ if (Log.INFO) LOG.info("reading summary file: " + metadataFile);
+ return readFooter(configuration, metadataFile, filter(skipRowGroups));
+ } else {
+ return null;
+ }
+ }
+
+ static List<Footer> footersFromSummaryFile(final Path parent, ParquetMetadata mergedFooters) {
Map<Path, ParquetMetadata> footers = new HashMap<Path, ParquetMetadata>();
List<BlockMetaData> blocks = mergedFooters.getBlocks();
for (BlockMetaData block : blocks) {
@@ -249,25 +359,42 @@ public class ParquetFileReader implements Closeable {
* @return the metadata blocks in the footer
* @throws IOException if an error occurs while reading the file
*/
+ @Deprecated
public static final ParquetMetadata readFooter(Configuration configuration, Path file) throws IOException {
- FileSystem fileSystem = file.getFileSystem(configuration);
- return readFooter(configuration, fileSystem.getFileStatus(file));
+ return readFooter(configuration, file, NO_FILTER);
}
-
- public static final List<Footer> readFooters(Configuration configuration, Path file) throws IOException {
+ /**
+ * Reads the meta data in the footer of the file.
+ * Skipping row groups (or not) based on the provided filter
+ * @param configuration
+ * @param file the Parquet File
+ * @param filter the filter to apply to row groups
+ * @return the metadata with row groups filtered.
+ * @throws IOException if an error occurs while reading the file
+ */
+ public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
FileSystem fileSystem = file.getFileSystem(configuration);
- return readFooters(configuration, fileSystem.getFileStatus(file));
+ return readFooter(configuration, fileSystem.getFileStatus(file), filter);
+ }
+
+ /**
+ * @deprecated use {@link ParquetFileReader#readFooter(Configuration, FileStatus, MetadataFilter)}
+ */
+ @Deprecated
+ public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file) throws IOException {
+ return readFooter(configuration, file, NO_FILTER);
}
/**
* Reads the meta data block in the footer of the file
* @param configuration
* @param file the parquet File
+ * @param filter the filter to apply to row groups
* @return the metadata blocks in the footer
* @throws IOException if an error occurs while reading the file
*/
- public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file) throws IOException {
+ public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
FileSystem fileSystem = file.getPath().getFileSystem(configuration);
FSDataInputStream f = fileSystem.open(file.getPath());
try {
@@ -293,7 +420,7 @@ public class ParquetFileReader implements Closeable {
throw new RuntimeException("corrupted file: the footer index is not within the file");
}
f.seek(footerIndex);
- return parquetMetadataConverter.readParquetMetadata(f);
+ return parquetMetadataConverter.readParquetMetadata(f, filter);
} finally {
f.close();
}
@@ -430,7 +557,7 @@ public class ParquetFileReader implements Closeable {
this.readAsBytesInput(pageHeader.compressed_page_size),
pageHeader.data_page_header.num_values,
pageHeader.uncompressed_page_size,
- parquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, descriptor.col.getType()),
+ ParquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, descriptor.col.getType()),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
index f3ef61b..42d91a4 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
@@ -15,6 +15,9 @@
*/
package parquet.hadoop;
+import static parquet.Log.DEBUG;
+import static parquet.format.Util.writeFileMetaData;
+
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -49,9 +52,6 @@ import parquet.io.ParquetEncodingException;
import parquet.schema.MessageType;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import static parquet.Log.DEBUG;
-import static parquet.format.Util.writeFileMetaData;
-
/**
* Internal implementation of the Parquet file writer as a block container
*
@@ -62,6 +62,7 @@ public class ParquetFileWriter {
private static final Log LOG = Log.getLog(ParquetFileWriter.class);
public static final String PARQUET_METADATA_FILE = "_metadata";
+ public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
public static final int CURRENT_VERSION = 1;
@@ -83,7 +84,7 @@ public class ParquetFileWriter {
private long currentChunkFirstDataPage;
private long currentChunkDictionaryPageOffset;
private long currentChunkValueCount;
-
+
private Statistics currentStatistics;
/**
@@ -387,19 +388,26 @@ public class ParquetFileWriter {
}
/**
- * writes a _metadata file
+ * writes a _metadata and _common_metadata file
* @param configuration the configuration to use to get the FileSystem
* @param outputPath the directory to write the _metadata file to
* @param footers the list of footers to merge
* @throws IOException
*/
public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
- Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE);
+ ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
FileSystem fs = outputPath.getFileSystem(configuration);
outputPath = outputPath.makeQualified(fs);
+ writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);
+ metadataFooter.getBlocks().clear();
+ writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE);
+ }
+
+ private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile)
+ throws IOException {
+ Path metaDataPath = new Path(outputPath, parquetMetadataFile);
FSDataOutputStream metadata = fs.create(metaDataPath);
metadata.write(MAGIC);
- ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
serializeFooter(metadataFooter, metadata);
metadata.close();
}
@@ -439,11 +447,10 @@ public class ParquetFileWriter {
* @param footers the list files footers to merge
* @return the global meta data for all the footers
*/
-
static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
return getGlobalMetaData(footers, true);
}
-
+
static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
GlobalMetaData fileMetaData = null;
for (Footer footer : footers) {
@@ -464,7 +471,7 @@ public class ParquetFileWriter {
GlobalMetaData mergedMetadata) {
return mergeInto(toMerge, mergedMetadata, true);
}
-
+
static GlobalMetaData mergeInto(
FileMetaData toMerge,
GlobalMetaData mergedMetadata,
@@ -505,7 +512,7 @@ public class ParquetFileWriter {
static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
return mergeInto(toMerge, mergedSchema, true);
}
-
+
/**
* will return the result of merging toMerge into mergedSchema
* @param toMerge the schema to merge into mergedSchema
@@ -517,7 +524,7 @@ public class ParquetFileWriter {
if (mergedSchema == null) {
return toMerge;
}
-
+
return mergedSchema.union(toMerge, strict);
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
index 0231ccd..d79ca51 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
@@ -15,9 +15,12 @@
*/
package parquet.hadoop;
+import static parquet.Preconditions.checkArgument;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -62,8 +65,6 @@ import parquet.io.ParquetDecodingException;
import parquet.schema.MessageType;
import parquet.schema.MessageTypeParser;
-import static parquet.Preconditions.checkArgument;
-
/**
* The input format to read a Parquet file.
*
@@ -89,7 +90,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
* key to configure the filter
*/
public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter";
-
+
/**
* key to configure type checking for conflicting schemas (default: true)
*/
@@ -100,11 +101,17 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
*/
public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";
+ public static final String TASK_SIDE_METADATA = "parquet.task.side.metadata";
+
private static final int MIN_FOOTER_CACHE_SIZE = 100;
- private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;
+ public static void setTaskSideMetaData(Job job, boolean taskSideMetadata) {
+ ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata);
+ }
- private Class<?> readSupportClass;
+ public static boolean isTaskSideMetaData(Configuration configuration) {
+ return configuration.getBoolean(TASK_SIDE_METADATA, Boolean.FALSE);
+ }
public static void setReadSupportClass(Job job, Class<?> readSupportClass) {
ContextUtil.getConfiguration(job).set(READ_SUPPORT_CLASS, readSupportClass.getName());
@@ -181,6 +188,10 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
return FilterCompat.get(getFilterPredicate(conf), getUnboundRecordFilterInstance(conf));
}
+ private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;
+
+ private Class<?> readSupportClass;
+
/**
* Hadoop will instantiate using this constructor
*/
@@ -202,11 +213,8 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
public RecordReader<Void, T> createRecordReader(
InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-
- ReadSupport<T> readSupport = getReadSupport(ContextUtil.getConfiguration(taskAttemptContext));
-
Configuration conf = ContextUtil.getConfiguration(taskAttemptContext);
-
+ ReadSupport<T> readSupport = getReadSupport(conf);
return new ParquetRecordReader<T>(readSupport, getFilter(conf));
}
@@ -217,6 +225,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
public ReadSupport<T> getReadSupport(Configuration configuration){
try {
if (readSupportClass == null) {
+ // TODO: fix this weird caching independent of the conf parameter
readSupportClass = getReadSupportClass(configuration);
}
return (ReadSupport<T>)readSupportClass.newInstance();
@@ -227,195 +236,13 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
}
}
- //Wrapper of hdfs blocks, keep track of which HDFS block is being used
- private static class HDFSBlocks {
- BlockLocation[] hdfsBlocks;
- int currentStartHdfsBlockIndex = 0;//the hdfs block index corresponding to the start of a row group
- int currentMidPointHDFSBlockIndex = 0;// the hdfs block index corresponding to the mid-point of a row group, a split might be created only when the midpoint of the rowgroup enters a new hdfs block
-
- private HDFSBlocks(BlockLocation[] hdfsBlocks) {
- this.hdfsBlocks = hdfsBlocks;
- Comparator<BlockLocation> comparator = new Comparator<BlockLocation>() {
- @Override
- public int compare(BlockLocation b1, BlockLocation b2) {
- return Long.signum(b1.getOffset() - b2.getOffset());
- }
- };
- Arrays.sort(hdfsBlocks, comparator);
- }
-
- private long getHDFSBlockEndingPosition(int hdfsBlockIndex) {
- BlockLocation hdfsBlock = hdfsBlocks[hdfsBlockIndex];
- return hdfsBlock.getOffset() + hdfsBlock.getLength() - 1;
- }
-
- /**
- * @param rowGroupMetadata
- * @return true if the mid point of row group is in a new hdfs block, and also move the currentHDFSBlock pointer to the correct index that contains the row group;
- * return false if the mid point of row group is in the same hdfs block
- */
- private boolean checkBelongingToANewHDFSBlock(BlockMetaData rowGroupMetadata) {
- boolean isNewHdfsBlock = false;
- long rowGroupMidPoint = rowGroupMetadata.getStartingPos() + (rowGroupMetadata.getCompressedSize() / 2);
-
- //if mid point is not in the current HDFS block any more, return true
- while (rowGroupMidPoint > getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex)) {
- isNewHdfsBlock = true;
- currentMidPointHDFSBlockIndex++;
- if (currentMidPointHDFSBlockIndex >= hdfsBlocks.length)
- throw new ParquetDecodingException("the row group is not in hdfs blocks in the file: midpoint of row groups is "
- + rowGroupMidPoint
- + ", the end of the hdfs block is "
- + getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex - 1));
- }
-
- while (rowGroupMetadata.getStartingPos() > getHDFSBlockEndingPosition(currentStartHdfsBlockIndex)) {
- currentStartHdfsBlockIndex++;
- if (currentStartHdfsBlockIndex >= hdfsBlocks.length)
- throw new ParquetDecodingException("The row group does not start in this file: row group offset is "
- + rowGroupMetadata.getStartingPos()
- + " but the end of hdfs blocks of file is "
- + getHDFSBlockEndingPosition(currentStartHdfsBlockIndex));
- }
- return isNewHdfsBlock;
- }
-
- public BlockLocation get(int hdfsBlockIndex) {
- return hdfsBlocks[hdfsBlockIndex];
- }
-
- public BlockLocation getCurrentBlock() {
- return hdfsBlocks[currentStartHdfsBlockIndex];
- }
- }
-
- private static class SplitInfo {
- List<BlockMetaData> rowGroups = new ArrayList<BlockMetaData>();
- BlockLocation hdfsBlock;
- long compressedByteSize = 0L;
-
- public SplitInfo(BlockLocation currentBlock) {
- this.hdfsBlock = currentBlock;
- }
-
- private void addRowGroup(BlockMetaData rowGroup) {
- this.rowGroups.add(rowGroup);
- this.compressedByteSize += rowGroup.getCompressedSize();
- }
-
- public long getCompressedByteSize() {
- return compressedByteSize;
- }
-
- public List<BlockMetaData> getRowGroups() {
- return rowGroups;
- }
-
- int getRowGroupCount() {
- return rowGroups.size();
- }
-
- public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, FileMetaData fileMetaData, String requestedSchema, Map<String, String> readSupportMetadata, String fileSchema) throws IOException {
- MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
- long length = 0;
-
- for (BlockMetaData block : this.getRowGroups()) {
- List<ColumnChunkMetaData> columns = block.getColumns();
- for (ColumnChunkMetaData column : columns) {
- if (requested.containsPath(column.getPath().toArray())) {
- length += column.getTotalSize();
- }
- }
- }
- return new ParquetInputSplit(
- fileStatus.getPath(),
- hdfsBlock.getOffset(),
- length,
- hdfsBlock.getHosts(),
- this.getRowGroups(),
- requestedSchema,
- fileSchema,
- fileMetaData.getKeyValueMetaData(),
- readSupportMetadata
- );
- }
- }
-
- /**
- * groups together all the data blocks for the same HDFS block
- *
- * @param rowGroupBlocks data blocks (row groups)
- * @param hdfsBlocksArray hdfs blocks
- * @param fileStatus the containing file
- * @param fileMetaData file level meta data
- * @param requestedSchema the schema requested by the user
- * @param readSupportMetadata the metadata provided by the readSupport implementation in init
- * @param minSplitSize the mapred.min.split.size
- * @param maxSplitSize the mapred.max.split.size
- * @return the splits (one per HDFS block)
- * @throws IOException If hosts can't be retrieved for the HDFS block
- */
- static <T> List<ParquetInputSplit> generateSplits(
- List<BlockMetaData> rowGroupBlocks,
- BlockLocation[] hdfsBlocksArray,
- FileStatus fileStatus,
- FileMetaData fileMetaData,
- String requestedSchema,
- Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
- if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
- throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
- }
- String fileSchema = fileMetaData.getSchema().toString().intern();
- HDFSBlocks hdfsBlocks = new HDFSBlocks(hdfsBlocksArray);
- hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupBlocks.get(0));
- SplitInfo currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
-
- //assign rowGroups to splits
- List<SplitInfo> splitRowGroups = new ArrayList<SplitInfo>();
- checkSorted(rowGroupBlocks);//assert row groups are sorted
- for (BlockMetaData rowGroupMetadata : rowGroupBlocks) {
- if ((hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupMetadata)
- && currentSplit.getCompressedByteSize() >= minSplitSize
- && currentSplit.getCompressedByteSize() > 0)
- || currentSplit.getCompressedByteSize() >= maxSplitSize) {
- //create a new split
- splitRowGroups.add(currentSplit);//finish previous split
- currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
- }
- currentSplit.addRowGroup(rowGroupMetadata);
- }
-
- if (currentSplit.getRowGroupCount() > 0) {
- splitRowGroups.add(currentSplit);
- }
-
- //generate splits from rowGroups of each split
- List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
- for (SplitInfo splitInfo : splitRowGroups) {
- ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, fileMetaData, requestedSchema, readSupportMetadata, fileSchema);
- resultSplits.add(split);
- }
- return resultSplits;
- }
-
- private static void checkSorted(List<BlockMetaData> rowGroupBlocks) {
- long previousOffset = 0L;
- for(BlockMetaData rowGroup: rowGroupBlocks) {
- long currentOffset = rowGroup.getStartingPos();
- if (currentOffset < previousOffset) {
- throw new ParquetDecodingException("row groups are not sorted: previous row groups starts at " + previousOffset + ", current row group starts at " + currentOffset);
- }
- }
- }
-
/**
* {@inheritDoc}
*/
@Override
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
- List<InputSplit> splits = new ArrayList<InputSplit>();
- splits.addAll(getSplits(ContextUtil.getConfiguration(jobContext), getFooters(jobContext)));
- return splits;
+ Configuration configuration = ContextUtil.getConfiguration(jobContext);
+ return new ArrayList<InputSplit>(getSplits(configuration, getFooters(jobContext)));
}
/**
@@ -425,63 +252,20 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
* @throws IOException
*/
public List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers) throws IOException {
+ boolean taskSideMetaData = isTaskSideMetaData(configuration);
+ boolean strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
final long maxSplitSize = configuration.getLong("mapred.max.split.size", Long.MAX_VALUE);
final long minSplitSize = Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L));
if (maxSplitSize < 0 || minSplitSize < 0) {
- throw new ParquetDecodingException("maxSplitSize or minSplitSie should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
+ throw new ParquetDecodingException("maxSplitSize or minSplitSize should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
}
- List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
- GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, configuration.getBoolean(STRICT_TYPE_CHECKING, true));
+ GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, strictTypeChecking);
ReadContext readContext = getReadSupport(configuration).init(new InitContext(
configuration,
globalMetaData.getKeyValueMetaData(),
globalMetaData.getSchema()));
- Filter filter = getFilter(configuration);
-
- long rowGroupsDropped = 0;
- long totalRowGroups = 0;
-
- for (Footer footer : footers) {
- final Path file = footer.getFile();
- LOG.debug(file);
- FileSystem fs = file.getFileSystem(configuration);
- FileStatus fileStatus = fs.getFileStatus(file);
- ParquetMetadata parquetMetaData = footer.getParquetMetadata();
- List<BlockMetaData> blocks = parquetMetaData.getBlocks();
-
- List<BlockMetaData> filteredBlocks = blocks;
-
- totalRowGroups += blocks.size();
- filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
- rowGroupsDropped += blocks.size() - filteredBlocks.size();
-
- if (filteredBlocks.isEmpty()) {
- continue;
- }
-
- BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
- splits.addAll(
- generateSplits(
- filteredBlocks,
- fileBlockLocations,
- fileStatus,
- parquetMetaData.getFileMetaData(),
- readContext.getRequestedSchema().toString(),
- readContext.getReadSupportMetadata(),
- minSplitSize,
- maxSplitSize)
- );
- }
-
- if (rowGroupsDropped > 0 && totalRowGroups > 0) {
- int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
- LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
- } else {
- LOG.info("There were no row groups that could be dropped due to filter predicates");
- }
-
- return splits;
+ return SplitStrategy.getSplitStrategy(taskSideMetaData).getSplits(configuration, footers, maxSplitSize, minSplitSize, readContext);
}
/*
@@ -539,7 +323,6 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
if (statuses.isEmpty()) {
return Collections.emptyList();
}
-
Configuration config = ContextUtil.getConfiguration(jobContext);
List<Footer> footers = new ArrayList<Footer>(statuses.size());
Set<FileStatus> missingStatuses = new HashSet<FileStatus>();
@@ -575,8 +358,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
return footers;
}
- List<Footer> newFooters =
- getFooters(config, new ArrayList<FileStatus>(missingStatuses));
+ List<Footer> newFooters = getFooters(config, missingStatuses);
for (Footer newFooter : newFooters) {
// Use the original file status objects to make sure we store a
// conservative (older) modification time (i.e. in case the files and
@@ -590,6 +372,10 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
return footers;
}
+ public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
+ return getFooters(configuration, (Collection<FileStatus>)statuses);
+ }
+
/**
* the footers for the files
* @param configuration to connect to the file system
@@ -597,9 +383,10 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
* @return the footers of the files
* @throws IOException
*/
- public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
+ public List<Footer> getFooters(Configuration configuration, Collection<FileStatus> statuses) throws IOException {
if (Log.DEBUG) LOG.debug("reading " + statuses.size() + " files");
- return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses);
+ boolean taskSideMetaData = isTaskSideMetaData(configuration);
+ return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses, taskSideMetaData);
}
/**
@@ -688,3 +475,362 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
}
}
+abstract class SplitStrategy {
+ private static final Log LOG = Log.getLog(SplitStrategy.class);
+
+ static SplitStrategy getSplitStrategy(boolean taskSideMetaData) {
+ if (taskSideMetaData) {
+ LOG.info("Using Task Side Metadata Split Strategy");
+ return new TaskSideMetadataSplitStrategy();
+ } else {
+ LOG.info("Using Client Side Metadata Split Strategy");
+ return new ClientSideMetadataSplitStrategy();
+ }
+ }
+
+ abstract List<ParquetInputSplit> getSplits(
+ Configuration configuration,
+ List<Footer> footers,
+ final long maxSplitSize, final long minSplitSize,
+ ReadContext readContext) throws IOException;
+}
+class TaskSideMetadataSplitStrategy extends SplitStrategy {
+
+ @Override
+ List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
+ long maxSplitSize, long minSplitSize, ReadContext readContext) throws IOException {
+ List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
+ for (Footer footer : footers) {
+ // TODO: keep status in Footer
+ final Path file = footer.getFile();
+ FileSystem fs = file.getFileSystem(configuration);
+ FileStatus fileStatus = fs.getFileStatus(file);
+ BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+ splits.addAll(generateTaskSideMDSplits(
+ fileBlockLocations,
+ fileStatus,
+ readContext.getRequestedSchema().toString(),
+ readContext.getReadSupportMetadata(),
+ minSplitSize,
+ maxSplitSize));
+
+ }
+ return splits;
+ }
+
+ private static int findBlockIndex(BlockLocation[] hdfsBlocksArray, long endOffset) {
+ for (int i = 0; i < hdfsBlocksArray.length; i++) {
+ BlockLocation block = hdfsBlocksArray[i];
+ // end offset is exclusive. We want the block that contains the point right before.
+ if (endOffset > block.getOffset() && endOffset <= (block.getOffset() + block.getLength())) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ static <T> List<ParquetInputSplit> generateTaskSideMDSplits(
+ BlockLocation[] hdfsBlocksArray,
+ FileStatus fileStatus,
+ String requestedSchema,
+ Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
+ if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
+ throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
+ }
+ //generate splits from rowGroups of each split
+ List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
+ // [startOffset, endOffset)
+ long startOffset = 0;
+ long endOffset = 0;
+ // they should already be sorted
+ Arrays.sort(hdfsBlocksArray, new Comparator<BlockLocation>() {
+ @Override
+ public int compare(BlockLocation o1, BlockLocation o2) {
+ return compare(o1.getOffset(), o2.getOffset());
+ }
+ private int compare(long x, long y) {
+ return (x < y) ? -1 : ((x == y) ? 0 : 1);
+ }
+ });
+ final BlockLocation lastBlock =
+ hdfsBlocksArray.length == 0 ? null : hdfsBlocksArray[hdfsBlocksArray.length - 1];
+ while (endOffset < fileStatus.getLen()) {
+ startOffset = endOffset;
+ BlockLocation blockLocation;
+ final int nextBlockMin = findBlockIndex(hdfsBlocksArray, startOffset + minSplitSize);
+ final int nextBlockMax = findBlockIndex(hdfsBlocksArray, startOffset + maxSplitSize);
+ if (nextBlockMax == nextBlockMin && nextBlockMax != -1) {
+ // no block boundary between min and max
+ // => use max for the size of the split
+ endOffset = startOffset + maxSplitSize;
+ blockLocation = hdfsBlocksArray[nextBlockMax];
+ } else if (nextBlockMin > -1) {
+ // block boundary between min and max
+ // we end the split at the first block boundary
+ blockLocation = hdfsBlocksArray[nextBlockMin];
+ endOffset = blockLocation.getOffset() + blockLocation.getLength();
+ } else {
+ // min and max after last block
+ // small last split
+ endOffset = fileStatus.getLen();
+ blockLocation = lastBlock;
+ }
+ resultSplits.add(
+ new ParquetInputSplit(
+ fileStatus.getPath(),
+ startOffset, endOffset, endOffset - startOffset,
+ blockLocation == null ? new String[0] : blockLocation.getHosts(),
+ null,
+ requestedSchema, readSupportMetadata));
+ }
+ return resultSplits;
+ }
+}
+class ClientSideMetadataSplitStrategy extends SplitStrategy {
+ //Wrapper of hdfs blocks, keep track of which HDFS block is being used
+ private static class HDFSBlocks {
+ BlockLocation[] hdfsBlocks;
+ int currentStartHdfsBlockIndex = 0;//the hdfs block index corresponding to the start of a row group
+ int currentMidPointHDFSBlockIndex = 0;// the hdfs block index corresponding to the mid-point of a row group, a split might be created only when the midpoint of the rowgroup enters a new hdfs block
+
+ private HDFSBlocks(BlockLocation[] hdfsBlocks) {
+ this.hdfsBlocks = hdfsBlocks;
+ Comparator<BlockLocation> comparator = new Comparator<BlockLocation>() {
+ @Override
+ public int compare(BlockLocation b1, BlockLocation b2) {
+ return Long.signum(b1.getOffset() - b2.getOffset());
+ }
+ };
+ Arrays.sort(hdfsBlocks, comparator);
+ }
+
+ private long getHDFSBlockEndingPosition(int hdfsBlockIndex) {
+ BlockLocation hdfsBlock = hdfsBlocks[hdfsBlockIndex];
+ return hdfsBlock.getOffset() + hdfsBlock.getLength() - 1;
+ }
+
+ /**
+ * @param rowGroupMetadata
+ * @return true if the mid point of row group is in a new hdfs block, and also move the currentHDFSBlock pointer to the correct index that contains the row group;
+ * return false if the mid point of row group is in the same hdfs block
+ */
+ private boolean checkBelongingToANewHDFSBlock(BlockMetaData rowGroupMetadata) {
+ boolean isNewHdfsBlock = false;
+ long rowGroupMidPoint = rowGroupMetadata.getStartingPos() + (rowGroupMetadata.getCompressedSize() / 2);
+
+ //if mid point is not in the current HDFS block any more, return true
+ while (rowGroupMidPoint > getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex)) {
+ isNewHdfsBlock = true;
+ currentMidPointHDFSBlockIndex++;
+ if (currentMidPointHDFSBlockIndex >= hdfsBlocks.length)
+ throw new ParquetDecodingException("the row group is not in hdfs blocks in the file: midpoint of row groups is "
+ + rowGroupMidPoint
+ + ", the end of the hdfs block is "
+ + getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex - 1));
+ }
+
+ while (rowGroupMetadata.getStartingPos() > getHDFSBlockEndingPosition(currentStartHdfsBlockIndex)) {
+ currentStartHdfsBlockIndex++;
+ if (currentStartHdfsBlockIndex >= hdfsBlocks.length)
+ throw new ParquetDecodingException("The row group does not start in this file: row group offset is "
+ + rowGroupMetadata.getStartingPos()
+ + " but the end of hdfs blocks of file is "
+ + getHDFSBlockEndingPosition(currentStartHdfsBlockIndex));
+ }
+ return isNewHdfsBlock;
+ }
+
+ public BlockLocation getCurrentBlock() {
+ return hdfsBlocks[currentStartHdfsBlockIndex];
+ }
+ }
+
+ static class SplitInfo {
+ List<BlockMetaData> rowGroups = new ArrayList<BlockMetaData>();
+ BlockLocation hdfsBlock;
+ long compressedByteSize = 0L;
+
+ public SplitInfo(BlockLocation currentBlock) {
+ this.hdfsBlock = currentBlock;
+ }
+
+ private void addRowGroup(BlockMetaData rowGroup) {
+ this.rowGroups.add(rowGroup);
+ this.compressedByteSize += rowGroup.getCompressedSize();
+ }
+
+ public long getCompressedByteSize() {
+ return compressedByteSize;
+ }
+
+ public List<BlockMetaData> getRowGroups() {
+ return rowGroups;
+ }
+
+ int getRowGroupCount() {
+ return rowGroups.size();
+ }
+
+ public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, String requestedSchema, Map<String, String> readSupportMetadata) throws IOException {
+ MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
+ long length = 0;
+
+ for (BlockMetaData block : this.getRowGroups()) {
+ List<ColumnChunkMetaData> columns = block.getColumns();
+ for (ColumnChunkMetaData column : columns) {
+ if (requested.containsPath(column.getPath().toArray())) {
+ length += column.getTotalSize();
+ }
+ }
+ }
+
+ BlockMetaData lastRowGroup = this.getRowGroups().get(this.getRowGroupCount() - 1);
+ long end = lastRowGroup.getStartingPos() + lastRowGroup.getTotalByteSize();
+
+ long[] rowGroupOffsets = new long[this.getRowGroupCount()];
+ for (int i = 0; i < rowGroupOffsets.length; i++) {
+ rowGroupOffsets[i] = this.getRowGroups().get(i).getStartingPos();
+ }
+
+ return new ParquetInputSplit(
+ fileStatus.getPath(),
+ hdfsBlock.getOffset(),
+ end,
+ length,
+ hdfsBlock.getHosts(),
+ rowGroupOffsets,
+ requestedSchema,
+ readSupportMetadata
+ );
+ }
+ }
+
+ private static final Log LOG = Log.getLog(ClientSideMetadataSplitStrategy.class);
+
+ @Override
+ List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
+ long maxSplitSize, long minSplitSize, ReadContext readContext)
+ throws IOException {
+ List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
+ Filter filter = ParquetInputFormat.getFilter(configuration);
+
+ long rowGroupsDropped = 0;
+ long totalRowGroups = 0;
+
+ for (Footer footer : footers) {
+ final Path file = footer.getFile();
+ LOG.debug(file);
+ FileSystem fs = file.getFileSystem(configuration);
+ FileStatus fileStatus = fs.getFileStatus(file);
+ ParquetMetadata parquetMetaData = footer.getParquetMetadata();
+ List<BlockMetaData> blocks = parquetMetaData.getBlocks();
+
+ List<BlockMetaData> filteredBlocks;
+
+ totalRowGroups += blocks.size();
+ filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
+ rowGroupsDropped += blocks.size() - filteredBlocks.size();
+
+ if (filteredBlocks.isEmpty()) {
+ continue;
+ }
+
+ BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+ splits.addAll(
+ generateSplits(
+ filteredBlocks,
+ fileBlockLocations,
+ fileStatus,
+ readContext.getRequestedSchema().toString(),
+ readContext.getReadSupportMetadata(),
+ minSplitSize,
+ maxSplitSize)
+ );
+ }
+
+ if (rowGroupsDropped > 0 && totalRowGroups > 0) {
+ int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
+ LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
+ } else {
+ LOG.info("There were no row groups that could be dropped due to filter predicates");
+ }
+ return splits;
+ }
+
+ /**
+ * groups together all the data blocks for the same HDFS block
+ *
+ * @param rowGroupBlocks data blocks (row groups)
+ * @param hdfsBlocksArray hdfs blocks
+ * @param fileStatus the containing file
+ * @param requestedSchema the schema requested by the user
+ * @param readSupportMetadata the metadata provided by the readSupport implementation in init
+ * @param minSplitSize the mapred.min.split.size
+ * @param maxSplitSize the mapred.max.split.size
+ * @return the splits (one per HDFS block)
+ * @throws IOException If hosts can't be retrieved for the HDFS block
+ */
+ static <T> List<ParquetInputSplit> generateSplits(
+ List<BlockMetaData> rowGroupBlocks,
+ BlockLocation[] hdfsBlocksArray,
+ FileStatus fileStatus,
+ String requestedSchema,
+ Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
+
+ List<SplitInfo> splitRowGroups =
+ generateSplitInfo(rowGroupBlocks, hdfsBlocksArray, minSplitSize, maxSplitSize);
+
+ //generate splits from rowGroups of each split
+ List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
+ for (SplitInfo splitInfo : splitRowGroups) {
+ ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, requestedSchema, readSupportMetadata);
+ resultSplits.add(split);
+ }
+ return resultSplits;
+ }
+
+ static List<SplitInfo> generateSplitInfo(
+ List<BlockMetaData> rowGroupBlocks,
+ BlockLocation[] hdfsBlocksArray,
+ long minSplitSize, long maxSplitSize) {
+ List<SplitInfo> splitRowGroups;
+
+ if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
+ throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
+ }
+ HDFSBlocks hdfsBlocks = new HDFSBlocks(hdfsBlocksArray);
+ hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupBlocks.get(0));
+ SplitInfo currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
+
+ //assign rowGroups to splits
+ splitRowGroups = new ArrayList<SplitInfo>();
+ checkSorted(rowGroupBlocks);//assert row groups are sorted
+ for (BlockMetaData rowGroupMetadata : rowGroupBlocks) {
+ if ((hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupMetadata)
+ && currentSplit.getCompressedByteSize() >= minSplitSize
+ && currentSplit.getCompressedByteSize() > 0)
+ || currentSplit.getCompressedByteSize() >= maxSplitSize) {
+ //create a new split
+ splitRowGroups.add(currentSplit);//finish previous split
+ currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
+ }
+ currentSplit.addRowGroup(rowGroupMetadata);
+ }
+
+ if (currentSplit.getRowGroupCount() > 0) {
+ splitRowGroups.add(currentSplit);
+ }
+
+ return splitRowGroups;
+ }
+
+ private static void checkSorted(List<BlockMetaData> rowGroupBlocks) {
+ long previousOffset = 0L;
+ for(BlockMetaData rowGroup: rowGroupBlocks) {
+ long currentOffset = rowGroup.getStartingPos();
+ if (currentOffset < previousOffset) {
+ throw new ParquetDecodingException("row groups are not sorted: previous row groups starts at " + previousOffset + ", current row group starts at " + currentOffset);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
index da0c2ec..399be64 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
@@ -15,54 +15,47 @@
*/
package parquet.hadoop;
-import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import parquet.Log;
-import parquet.column.Encoding;
-import parquet.column.statistics.IntStatistics;
-import parquet.common.schema.ColumnPath;
+import parquet.bytes.BytesUtils;
import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
/**
* An input split for the Parquet format
* It contains the information to read one block of the file.
*
+ * This class is private to the ParquetInputFormat.
+ * Backward compatibility is not maintained.
+ *
* @author Julien Le Dem
*/
+@Private
public class ParquetInputSplit extends FileSplit implements Writable {
- private static final Log LOG = Log.getLog(ParquetInputSplit.class);
- private List<BlockMetaData> blocks;
+ private long end;
+ private long[] rowGroupOffsets;
private String requestedSchema;
- private String fileSchema;
- private Map<String, String> extraMetadata;
private Map<String, String> readSupportMetadata;
-
/**
* Writables must have a parameterless constructor
*/
@@ -71,19 +64,19 @@ public class ParquetInputSplit extends FileSplit implements Writable {
}
/**
- * Used by {@link ParquetInputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)}
- * @param path the path to the file
- * @param start the offset of the block in the file
- * @param length the size of the block in the file
- * @param hosts the hosts where this block can be found
- * @param blocks the block meta data (Columns locations)
- * @param schema the file schema
- * @param readSupportClass the class used to materialize records
- * @param requestedSchema the requested schema for materialization
- * @param fileSchema the schema of the file
- * @param extraMetadata the app specific meta data in the file
- * @param readSupportMetadata the read support specific metadata
+ * For compatibility only
+ * use {@link ParquetInputSplit#ParquetInputSplit(Path, long, long, long, String[], long[], String, Map)}
+ * @param path
+ * @param start
+ * @param length
+ * @param hosts
+ * @param blocks
+ * @param requestedSchema
+ * @param fileSchema
+ * @param extraMetadata
+ * @param readSupportMetadata
*/
+ @Deprecated
public ParquetInputSplit(
Path path,
long start,
@@ -94,212 +87,168 @@ public class ParquetInputSplit extends FileSplit implements Writable {
String fileSchema,
Map<String, String> extraMetadata,
Map<String, String> readSupportMetadata) {
- super(path, start, length, hosts);
- this.blocks = blocks;
- this.requestedSchema = requestedSchema;
- this.fileSchema = fileSchema;
- this.extraMetadata = extraMetadata;
- this.readSupportMetadata = readSupportMetadata;
+ this(
+ path, start, length, end(blocks), hosts,
+ offsets(blocks),
+ requestedSchema, readSupportMetadata
+ );
+ }
+
+ private static long end(List<BlockMetaData> blocks) {
+ BlockMetaData last = blocks.get(blocks.size() - 1);
+ return last.getStartingPos() + last.getCompressedSize();
+ }
+
+ private static long[] offsets(List<BlockMetaData> blocks) {
+ long[] offsets = new long[blocks.size()];
+ for (int i = 0; i < offsets.length; i++) {
+ offsets[i] = blocks.get(0).getStartingPos();
+ }
+ return offsets;
}
/**
- * @return the block meta data
+ * @param file the path of the file for that split
+ * @param start the start offset in the file
+ * @param end the end offset in the file
+ * @param length the actual size in bytes that we expect to read
+ * @param hosts the hosts with the replicas of this data
+ * @param rowGroupOffsets the offsets of the rowgroups selected if loaded on the client
+ * @param requestedSchema the user requested schema
+ * @param readSupportMetadata metadata from the read support
*/
- public List<BlockMetaData> getBlocks() {
- return blocks;
+ public ParquetInputSplit(
+ Path file, long start, long end, long length, String[] hosts,
+ long[] rowGroupOffsets,
+ String requestedSchema,
+ Map<String, String> readSupportMetadata) {
+ super(file, start, length, hosts);
+ this.end = end;
+ this.rowGroupOffsets = rowGroupOffsets;
+ this.requestedSchema = requestedSchema;
+ this.readSupportMetadata = readSupportMetadata;
}
/**
* @return the requested schema
*/
- public String getRequestedSchema() {
+ String getRequestedSchema() {
return requestedSchema;
}
/**
- * @return the file schema
+ * @return the end offset of that split
*/
- public String getFileSchema() {
- return fileSchema;
+ long getEnd() {
+ return end;
}
/**
- * @return app specific metadata from the file
+ * @return app specific metadata provided by the read support in the init phase
*/
- public Map<String, String> getExtraMetadata() {
- return extraMetadata;
+ Map<String, String> getReadSupportMetadata() {
+ return readSupportMetadata;
}
/**
- * @return app specific metadata provided by the read support in the init phase
+ * @return the offsets of the row group selected if this has been determined on the client side
*/
- public Map<String, String> getReadSupportMetadata() {
- return readSupportMetadata;
+ long[] getRowGroupOffsets() {
+ return rowGroupOffsets;
+ }
+
+ @Override
+ public String toString() {
+ String hosts;
+ try{
+ hosts = Arrays.toString(getLocations());
+ } catch (Exception e) {
+ // IOException/InterruptedException could be thrown
+ hosts = "(" + e + ")";
+ }
+
+ return this.getClass().getSimpleName() + "{" +
+ "part: " + getPath()
+ + " start: " + getStart()
+ + " end: " + getEnd()
+ + " length: " + getLength()
+ + " hosts: " + hosts
+ + (rowGroupOffsets == null ? "" : (" row groups: " + Arrays.toString(rowGroupOffsets)))
+ + " requestedSchema: " + requestedSchema
+ + " readSupportMetadata: " + readSupportMetadata
+ + "}";
}
/**
* {@inheritDoc}
*/
@Override
- public void readFields(DataInput in) throws IOException {
+ final public void readFields(DataInput hin) throws IOException {
+ byte[] bytes = readArray(hin);
+ DataInputStream in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)));
super.readFields(in);
- int blocksSize = in.readInt();
- this.blocks = new ArrayList<BlockMetaData>(blocksSize);
- for (int i = 0; i < blocksSize; i++) {
- blocks.add(readBlock(in));
+ this.end = in.readLong();
+ if (in.readBoolean()) {
+ this.rowGroupOffsets = new long[in.readInt()];
+ for (int i = 0; i < rowGroupOffsets.length; i++) {
+ rowGroupOffsets[i] = in.readLong();
+ }
}
- this.requestedSchema = decompressString(in);
- this.fileSchema = decompressString(in);
- this.extraMetadata = readKeyValues(in);
+ this.requestedSchema = readUTF8(in);
this.readSupportMetadata = readKeyValues(in);
+ in.close();
}
/**
* {@inheritDoc}
*/
@Override
- public void write(DataOutput out) throws IOException {
+ final public void write(DataOutput hout) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(new GZIPOutputStream(baos));
super.write(out);
- out.writeInt(blocks.size());
- for (BlockMetaData block : blocks) {
- writeBlock(out, block);
- }
- byte[] compressedSchema = compressString(requestedSchema);
- out.writeInt(compressedSchema.length);
- out.write(compressedSchema);
- compressedSchema = compressString(fileSchema);
- out.writeInt(compressedSchema.length);
- out.write(compressedSchema);
- writeKeyValues(out, extraMetadata);
- writeKeyValues(out, readSupportMetadata);
- }
-
- byte[] compressString(String str) {
- ByteArrayOutputStream obj = new ByteArrayOutputStream();
- GZIPOutputStream gzip;
- try {
- gzip = new GZIPOutputStream(obj);
- gzip.write(str.getBytes("UTF-8"));
- gzip.close();
- } catch (IOException e) {
- // Not really sure how we can get here. I guess the best thing to do is to croak.
- LOG.error("Unable to gzip InputSplit string " + str, e);
- throw new RuntimeException("Unable to gzip InputSplit string", e);
- }
- return obj.toByteArray();
- }
-
- String decompressString(DataInput in) throws IOException {
- int len = in.readInt();
- byte[] bytes = new byte[len];
- in.readFully(bytes);
- return decompressString(bytes);
- }
-
- String decompressString(byte[] bytes) {
- ByteArrayInputStream obj = new ByteArrayInputStream(bytes);
- GZIPInputStream gzip = null;
- String outStr = "";
- try {
- gzip = new GZIPInputStream(obj);
- BufferedReader reader = new BufferedReader(new InputStreamReader(gzip, "UTF-8"));
- char[] buffer = new char[1024];
- int n = 0;
- StringBuilder sb = new StringBuilder();
- while (-1 != (n = reader.read(buffer))) {
- sb.append(buffer, 0, n);
- }
- outStr = sb.toString();
- } catch (IOException e) {
- // Not really sure how we can get here. I guess the best thing to do is to croak.
- LOG.error("Unable to uncompress InputSplit string", e);
- throw new RuntimeException("Unable to uncompress InputSplit String", e);
- } finally {
- if (null != gzip) {
- try {
- gzip.close();
- } catch (IOException e) {
- LOG.error("Unable to uncompress InputSplit", e);
- throw new RuntimeException("Unable to uncompress InputSplit String", e);
- }
+ out.writeLong(end);
+ out.writeBoolean(rowGroupOffsets != null);
+ if (rowGroupOffsets != null) {
+ out.writeInt(rowGroupOffsets.length);
+ for (long o : rowGroupOffsets) {
+ out.writeLong(o);
}
}
- return outStr;
+ writeUTF8(out, requestedSchema);
+ writeKeyValues(out, readSupportMetadata);
+ out.close();
+ writeArray(hout, baos.toByteArray());
}
- private BlockMetaData readBlock(DataInput in) throws IOException {
- final BlockMetaData block = new BlockMetaData();
- int size = in.readInt();
- for (int i = 0; i < size; i++) {
- block.addColumn(readColumn(in));
- }
- block.setRowCount(in.readLong());
- block.setTotalByteSize(in.readLong());
- if (!in.readBoolean()) {
- block.setPath(in.readUTF().intern());
- }
- return block;
+ private static void writeUTF8(DataOutput out, String string) throws IOException {
+ byte[] bytes = string.getBytes(BytesUtils.UTF8);
+ writeArray(out, bytes);
}
- private void writeBlock(DataOutput out, BlockMetaData block)
- throws IOException {
- out.writeInt(block.getColumns().size());
- for (ColumnChunkMetaData column : block.getColumns()) {
- writeColumn(out, column);
- }
- out.writeLong(block.getRowCount());
- out.writeLong(block.getTotalByteSize());
- out.writeBoolean(block.getPath() == null);
- if (block.getPath() != null) {
- out.writeUTF(block.getPath());
- }
+ private static String readUTF8(DataInput in) throws IOException {
+ byte[] bytes = readArray(in);
+ return new String(bytes, BytesUtils.UTF8).intern();
}
- private ColumnChunkMetaData readColumn(DataInput in)
- throws IOException {
- CompressionCodecName codec = CompressionCodecName.values()[in.readInt()];
- String[] columnPath = new String[in.readInt()];
- for (int i = 0; i < columnPath.length; i++) {
- columnPath[i] = in.readUTF().intern();
- }
- PrimitiveTypeName type = PrimitiveTypeName.values()[in.readInt()];
- int encodingsSize = in.readInt();
- Set<Encoding> encodings = new HashSet<Encoding>(encodingsSize);
- for (int i = 0; i < encodingsSize; i++) {
- encodings.add(Encoding.values()[in.readInt()]);
- }
- IntStatistics emptyStats = new IntStatistics();
- ColumnChunkMetaData column = ColumnChunkMetaData.get(
- ColumnPath.get(columnPath), type, codec, encodings, emptyStats,
- in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
- return column;
+ private static void writeArray(DataOutput out, byte[] bytes) throws IOException {
+ out.writeInt(bytes.length);
+ out.write(bytes, 0, bytes.length);
}
- private void writeColumn(DataOutput out, ColumnChunkMetaData column)
- throws IOException {
- out.writeInt(column.getCodec().ordinal());
- out.writeInt(column.getPath().size());
- for (String s : column.getPath()) {
- out.writeUTF(s);
- }
- out.writeInt(column.getType().ordinal());
- out.writeInt(column.getEncodings().size());
- for (Encoding encoding : column.getEncodings()) {
- out.writeInt(encoding.ordinal());
- }
- out.writeLong(column.getFirstDataPageOffset());
- out.writeLong(column.getDictionaryPageOffset());
- out.writeLong(column.getValueCount());
- out.writeLong(column.getTotalSize());
- out.writeLong(column.getTotalUncompressedSize());
+ private static byte[] readArray(DataInput in) throws IOException {
+ int len = in.readInt();
+ byte[] bytes = new byte[len];
+ in.readFully(bytes);
+ return bytes;
}
private Map<String, String> readKeyValues(DataInput in) throws IOException {
int size = in.readInt();
Map<String, String> map = new HashMap<String, String>(size);
for (int i = 0; i < size; i++) {
- String key = decompressString(in).intern();
- String value = decompressString(in).intern();
+ String key = readUTF8(in).intern();
+ String value = readUTF8(in).intern();
map.put(key, value);
}
return map;
@@ -311,35 +260,10 @@ public class ParquetInputSplit extends FileSplit implements Writable {
} else {
out.writeInt(map.size());
for (Entry<String, String> entry : map.entrySet()) {
- byte[] compr = compressString(entry.getKey());
- out.writeInt(compr.length);
- out.write(compr);
- compr = compressString(entry.getValue());
- out.writeInt(compr.length);
- out.write(compr);
+ writeUTF8(out, entry.getKey());
+ writeUTF8(out, entry.getValue());
}
}
}
-
- @Override
- public String toString() {
- String hosts[] = {};
- try{
- hosts = getLocations();
- }catch(Exception ignore){} // IOException/InterruptedException could be thrown
-
- return this.getClass().getSimpleName() + "{" +
- "part: " + getPath()
- + " start: " + getStart()
- + " length: " + getLength()
- + " hosts: " + Arrays.toString(hosts)
- + " blocks: " + blocks.size()
- + " requestedSchema: " + (fileSchema.equals(requestedSchema) ? "same as file" : requestedSchema)
- + " fileSchema: " + fileSchema
- + " extraMetadata: " + extraMetadata
- + " readSupportMetadata: " + readSupportMetadata
- + "}";
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
index 31917d2..940b893 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
@@ -18,7 +18,6 @@ package parquet.hadoop;
import java.io.IOException;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
index 6703001..74f4051 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import parquet.Log;
-import parquet.Preconditions;
import parquet.column.ParquetProperties.WriterVersion;
import parquet.hadoop.api.WriteSupport;
import parquet.hadoop.api.WriteSupport.WriteContext;
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
index c56a402..ec839e2 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
@@ -15,6 +15,8 @@
*/
package parquet.hadoop;
+import static parquet.Preconditions.checkNotNull;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
@@ -39,8 +41,6 @@ import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.GlobalMetaData;
import parquet.schema.MessageType;
-import static parquet.Preconditions.checkNotNull;
-
/**
* Read records from a Parquet file.
* TODO: too many constructors (https://issues.apache.org/jira/browse/PARQUET-39)
@@ -114,7 +114,7 @@ public class ParquetReader<T> implements Closeable {
FileSystem fs = file.getFileSystem(conf);
List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
- List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
+ List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses, false);
this.footersIterator = footers.iterator();
globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
MessageType schema = globalMetaData.getSchema();