You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/10/03 22:04:21 UTC

parquet-mr git commit: PARQUET-674: Add InputFile abstraction for openable files.

Repository: parquet-mr
Updated Branches:
  refs/heads/master e54ca615f -> b59be8659


PARQUET-674: Add InputFile abstraction for openable files.

Author: Ryan Blue <bl...@apache.org>

Closes #368 from rdblue/PARQUET-674-add-data-source and squashes the following commits:

8c689e9 [Ryan Blue] PARQUET-674: Implement review comments.
4a7c327 [Ryan Blue] PARQUET-674: Add DataSource abstraction for openable files.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/b59be865
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/b59be865
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/b59be865

Branch: refs/heads/master
Commit: b59be86597cfcd805c24fa406af46071400e24c8
Parents: e54ca61
Author: Ryan Blue <bl...@apache.org>
Authored: Mon Oct 3 15:04:12 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Mon Oct 3 15:04:12 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/parquet/io/InputFile.java   | 43 +++++++++++++
 .../parquet/hadoop/ParquetFileReader.java       | 32 ++++++----
 .../parquet/hadoop/util/HadoopInputFile.java    | 66 ++++++++++++++++++++
 3 files changed, 130 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b59be865/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
new file mode 100644
index 0000000..e2c7cc0
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
@@ -0,0 +1,43 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.IOException;
+
+/**
+ * {@code InputFile} is an interface with the methods needed by Parquet to read
+ * data files using {@link SeekableInputStream} instances.
+ */
+public interface InputFile {
+
+  /**
+   * Returns the total length of the file, in bytes.
+   * @throws IOException if the length cannot be determined
+   */
+  long getLength() throws IOException;
+
+  /**
+   * Opens a new {@link SeekableInputStream} for the underlying
+   * data file.
+   * @throws IOException if the stream cannot be opened.
+   */
+  SeekableInputStream newStream() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b59be865/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 59a7e46..57cdb7d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -88,11 +88,13 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
 import org.apache.parquet.hadoop.util.HadoopStreams;
 import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
 import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.InputFile;
 
 /**
  * Internal implementation of the Parquet file reader as a block container
@@ -410,8 +412,7 @@ public class ParquetFileReader implements Closeable {
    * @throws IOException  if an error occurs while reading the file
    */
   public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
-    FileSystem fileSystem = file.getFileSystem(configuration);
-    return readFooter(configuration, fileSystem.getFileStatus(file), filter);
+    return readFooter(HadoopInputFile.fromPath(file, configuration), filter);
   }
 
   /**
@@ -431,12 +432,21 @@ public class ParquetFileReader implements Closeable {
    * @throws IOException if an error occurs while reading the file
    */
   public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
-    FileSystem fileSystem = file.getPath().getFileSystem(configuration);
-    SeekableInputStream in = HadoopStreams.wrap(fileSystem.open(file.getPath()));
-    try {
-      return readFooter(file.getLen(), file.getPath().toString(), in, filter);
-    } finally {
-      in.close();
+    return readFooter(HadoopInputFile.fromStatus(file, configuration), filter);
+  }
+
+  /**
+   * Reads the meta data block in the footer of the file using provided input stream
+   * @param file a {@link InputFile} to read
+   * @param filter the filter to apply to row groups
+   * @return the metadata blocks in the footer
+   * @throws IOException if an error occurs while reading the file
+   */
+  public static final ParquetMetadata readFooter(
+      InputFile file, MetadataFilter filter) throws IOException {
+    try (SeekableInputStream in = file.newStream()) {
+      return readFooter(converter, file.getLength(), file.toString(),
+          in, filter);
     }
   }
 
@@ -449,7 +459,7 @@ public class ParquetFileReader implements Closeable {
    * @return the metadata blocks in the footer
    * @throws IOException if an error occurs while reading the file
    */
-  public static final ParquetMetadata readFooter(long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
+  private static final ParquetMetadata readFooter(ParquetMetadataConverter converter, long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
     if (Log.DEBUG) {
       LOG.debug("File length " + fileLen);
     }
@@ -563,7 +573,7 @@ public class ParquetFileReader implements Closeable {
     FileSystem fs = file.getFileSystem(conf);
     this.fileStatus = fs.getFileStatus(file);
     this.f = HadoopStreams.wrap(fs.open(file));
-    this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
+    this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
     this.fileMetaData = footer.getFileMetaData();
     this.blocks = footer.getBlocks();
     for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
@@ -602,7 +612,7 @@ public class ParquetFileReader implements Closeable {
     if (footer == null) {
       try {
         // don't read the row groups because this.blocks is always set
-        this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
+        this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
       } catch (IOException e) {
         throw new ParquetDecodingException("Unable to read file footer", e);
       }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b59be865/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
new file mode 100644
index 0000000..d5868d3
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
@@ -0,0 +1,66 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import java.io.IOException;
+
+public class HadoopInputFile implements InputFile {
+
+  private final FileSystem fs;
+  private final FileStatus stat;
+
+  public static HadoopInputFile fromPath(Path path, Configuration conf)
+      throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    return new HadoopInputFile(fs, fs.getFileStatus(path));
+  }
+
+  public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf)
+      throws IOException {
+    FileSystem fs = stat.getPath().getFileSystem(conf);
+    return new HadoopInputFile(fs, stat);
+  }
+
+  private HadoopInputFile(FileSystem fs, FileStatus stat) {
+    this.fs = fs;
+    this.stat = stat;
+  }
+
+  @Override
+  public long getLength() {
+    return stat.getLen();
+  }
+
+  @Override
+  public SeekableInputStream newStream() throws IOException {
+    return HadoopStreams.wrap(fs.open(stat.getPath()));
+  }
+
+  @Override
+  public String toString() {
+    return stat.getPath().toString();
+  }
+}