You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/10/03 22:04:21 UTC
parquet-mr git commit: PARQUET-674: Add InputFile abstraction for
openable files.
Repository: parquet-mr
Updated Branches:
refs/heads/master e54ca615f -> b59be8659
PARQUET-674: Add InputFile abstraction for openable files.
Author: Ryan Blue <bl...@apache.org>
Closes #368 from rdblue/PARQUET-674-add-data-source and squashes the following commits:
8c689e9 [Ryan Blue] PARQUET-674: Implement review comments.
4a7c327 [Ryan Blue] PARQUET-674: Add DataSource abstraction for openable files.
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/b59be865
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/b59be865
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/b59be865
Branch: refs/heads/master
Commit: b59be86597cfcd805c24fa406af46071400e24c8
Parents: e54ca61
Author: Ryan Blue <bl...@apache.org>
Authored: Mon Oct 3 15:04:12 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Mon Oct 3 15:04:12 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/parquet/io/InputFile.java | 43 +++++++++++++
.../parquet/hadoop/ParquetFileReader.java | 32 ++++++----
.../parquet/hadoop/util/HadoopInputFile.java | 66 ++++++++++++++++++++
3 files changed, 130 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b59be865/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
new file mode 100644
index 0000000..e2c7cc0
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.IOException;
+
+/**
+ * {@code InputFile} is an interface with the methods needed by Parquet to read
+ * data files using {@link SeekableInputStream} instances.
+ */
+public interface InputFile {
+
+ /**
+ * Returns the total length of the file, in bytes.
+ * @throws IOException if the length cannot be determined
+ */
+ long getLength() throws IOException;
+
+ /**
+ * Opens a new {@link SeekableInputStream} for the underlying
+ * data file.
+ * @throws IOException if the stream cannot be opened.
+ */
+ SeekableInputStream newStream() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b59be865/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 59a7e46..57cdb7d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -88,11 +88,13 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.hadoop.util.HadoopStreams;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.InputFile;
/**
* Internal implementation of the Parquet file reader as a block container
@@ -410,8 +412,7 @@ public class ParquetFileReader implements Closeable {
* @throws IOException if an error occurs while reading the file
*/
public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
- FileSystem fileSystem = file.getFileSystem(configuration);
- return readFooter(configuration, fileSystem.getFileStatus(file), filter);
+ return readFooter(HadoopInputFile.fromPath(file, configuration), filter);
}
/**
@@ -431,12 +432,21 @@ public class ParquetFileReader implements Closeable {
* @throws IOException if an error occurs while reading the file
*/
public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
- FileSystem fileSystem = file.getPath().getFileSystem(configuration);
- SeekableInputStream in = HadoopStreams.wrap(fileSystem.open(file.getPath()));
- try {
- return readFooter(file.getLen(), file.getPath().toString(), in, filter);
- } finally {
- in.close();
+ return readFooter(HadoopInputFile.fromStatus(file, configuration), filter);
+ }
+
+ /**
+ * Reads the meta data block in the footer of the file using provided input stream
+ * @param file a {@link InputFile} to read
+ * @param filter the filter to apply to row groups
+ * @return the metadata blocks in the footer
+ * @throws IOException if an error occurs while reading the file
+ */
+ public static final ParquetMetadata readFooter(
+ InputFile file, MetadataFilter filter) throws IOException {
+ try (SeekableInputStream in = file.newStream()) {
+ return readFooter(converter, file.getLength(), file.toString(),
+ in, filter);
}
}
@@ -449,7 +459,7 @@ public class ParquetFileReader implements Closeable {
* @return the metadata blocks in the footer
* @throws IOException if an error occurs while reading the file
*/
- public static final ParquetMetadata readFooter(long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
+ private static final ParquetMetadata readFooter(ParquetMetadataConverter converter, long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
if (Log.DEBUG) {
LOG.debug("File length " + fileLen);
}
@@ -563,7 +573,7 @@ public class ParquetFileReader implements Closeable {
FileSystem fs = file.getFileSystem(conf);
this.fileStatus = fs.getFileStatus(file);
this.f = HadoopStreams.wrap(fs.open(file));
- this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
+ this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
this.fileMetaData = footer.getFileMetaData();
this.blocks = footer.getBlocks();
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
@@ -602,7 +612,7 @@ public class ParquetFileReader implements Closeable {
if (footer == null) {
try {
// don't read the row groups because this.blocks is always set
- this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
+ this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
} catch (IOException e) {
throw new ParquetDecodingException("Unable to read file footer", e);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b59be865/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
new file mode 100644
index 0000000..d5868d3
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import java.io.IOException;
+
+public class HadoopInputFile implements InputFile {
+
+ private final FileSystem fs;
+ private final FileStatus stat;
+
+ public static HadoopInputFile fromPath(Path path, Configuration conf)
+ throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ return new HadoopInputFile(fs, fs.getFileStatus(path));
+ }
+
+ public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf)
+ throws IOException {
+ FileSystem fs = stat.getPath().getFileSystem(conf);
+ return new HadoopInputFile(fs, stat);
+ }
+
+ private HadoopInputFile(FileSystem fs, FileStatus stat) {
+ this.fs = fs;
+ this.stat = stat;
+ }
+
+ @Override
+ public long getLength() {
+ return stat.getLen();
+ }
+
+ @Override
+ public SeekableInputStream newStream() throws IOException {
+ return HadoopStreams.wrap(fs.open(stat.getPath()));
+ }
+
+ @Override
+ public String toString() {
+ return stat.getPath().toString();
+ }
+}