You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/03/02 21:58:13 UTC

[incubator-iceberg] branch master updated: Use HadoopInputFile and HadoopOutputFile FileSystem for ORC (#823)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7676318  Use HadoopInputFile and HadoopOutputFile FileSystem for ORC (#823)
7676318 is described below

commit 76763188c529b6e1e5b08200539b69fd05aee44a
Author: Vlad Rozov <vr...@users.noreply.github.com>
AuthorDate: Mon Mar 2 13:58:01 2020 -0800

    Use HadoopInputFile and HadoopOutputFile FileSystem for ORC (#823)
    
    Also, fix a resource leak.
---
 .../org/apache/iceberg/hadoop/HadoopInputFile.java |  4 ++++
 .../apache/iceberg/hadoop/HadoopOutputFile.java    |  4 ++++
 orc/src/main/java/org/apache/iceberg/orc/ORC.java  | 22 ++++++++++++++++++++++
 .../org/apache/iceberg/orc/OrcFileAppender.java    | 18 +++++++++---------
 .../java/org/apache/iceberg/orc/OrcIterable.java   | 14 ++------------
 .../java/org/apache/iceberg/orc/OrcMetrics.java    | 11 ++++-------
 6 files changed, 45 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
index ee03b9b..84c5bf2 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
@@ -161,6 +161,10 @@ public class HadoopInputFile implements InputFile {
     return conf;
   }
 
+  public FileSystem getFileSystem() {
+    return fs;
+  }
+
   public FileStatus getStat() {
     return lazyStat();
   }
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java
index b95cfff..921a762 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java
@@ -96,6 +96,10 @@ public class HadoopOutputFile implements OutputFile {
     return conf;
   }
 
+  public FileSystem getFileSystem() {
+    return fs;
+  }
+
   @Override
   public String location() {
     return path.toString();
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 6ffaf05..fc88fa9 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -20,12 +20,15 @@
 package org.apache.iceberg.orc;
 
 import com.google.common.base.Preconditions;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.hadoop.HadoopOutputFile;
 import org.apache.iceberg.io.CloseableIterable;
@@ -33,6 +36,9 @@ import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFile.ReaderOptions;
+import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
@@ -167,4 +173,20 @@ public class ORC {
       return new OrcIterable<>(file, conf, schema, start, length, readerFunc);
     }
   }
+
+  static Reader newFileReader(String location, ReaderOptions readerOptions) {
+    try {
+      return OrcFile.createReader(new Path(location), readerOptions);
+    } catch (IOException ioe) {
+      throw new RuntimeIOException(ioe, "Failed to open file: %s", location);
+    }
+  }
+
+  static Reader newFileReader(InputFile file, Configuration config) {
+    ReaderOptions readerOptions = OrcFile.readerOptions(config);
+    if (file instanceof HadoopInputFile) {
+      readerOptions.filesystem(((HadoopInputFile) file).getFileSystem());
+    }
+    return newFileReader(file.location(), readerOptions);
+  }
 }
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
index 3f67496..99e3f51 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopOutputFile;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.orc.OrcFile;
@@ -67,6 +68,9 @@ class OrcFileAppender<D> implements FileAppender<D> {
     this.batch = orcSchema.createRowBatch(this.batchSize);
 
     OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
+    if (file instanceof HadoopOutputFile) {
+      options.fileSystem(((HadoopOutputFile) file).getFileSystem());
+    }
     options.setSchema(orcSchema);
     this.writer = newOrcWriter(file, options, metadata);
     this.valueWriter = newOrcValueWriter(orcSchema, createWriterFunc);
@@ -100,16 +104,12 @@ class OrcFileAppender<D> implements FileAppender<D> {
   @Override
   public List<Long> splitOffsets() {
     Preconditions.checkState(isClosed, "File is not yet closed");
-    String fileLoc = file.location();
-    Reader reader;
-    try {
-      reader = OrcFile.createReader(new Path(fileLoc), new OrcFile.ReaderOptions(conf));
-    } catch (IOException ioe) {
-      throw new RuntimeIOException(ioe, "Cannot read file " + fileLoc);
+    try (Reader reader = ORC.newFileReader(file.toInputFile(), conf)) {
+      List<StripeInformation> stripes = reader.getStripes();
+      return Collections.unmodifiableList(Lists.transform(stripes, StripeInformation::getOffset));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Can't close ORC reader %s", file.location());
     }
-
-    List<StripeInformation> stripes = reader.getStripes();
-    return Collections.unmodifiableList(Lists.transform(stripes, StripeInformation::getOffset));
   }
 
   @Override
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
index 1c9d4ca..814c5bc 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
@@ -23,13 +23,11 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
-import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
@@ -59,7 +57,8 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
   @SuppressWarnings("unchecked")
   @Override
   public Iterator<T> iterator() {
-    Reader orcFileReader = newFileReader(file, config);
+    Reader orcFileReader = ORC.newFileReader(file, config);
+    addCloseable(orcFileReader);
     TypeDescription readOrcSchema = ORCSchemaUtil.buildOrcProjection(schema, orcFileReader.getSchema());
 
     return new OrcIterator(
@@ -84,15 +83,6 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
     }
   }
 
-  private static Reader newFileReader(InputFile file, Configuration config) {
-    try {
-      return OrcFile.createReader(new Path(file.location()),
-          OrcFile.readerOptions(config));
-    } catch (IOException ioe) {
-      throw new RuntimeIOException(ioe, "Failed to open file: %s", file);
-    }
-  }
-
   private static class OrcIterator<T> implements Iterator<T> {
 
     private int nextRow;
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
index 65bc64d..3689ec5 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
@@ -22,18 +22,17 @@ package org.apache.iceberg.orc;
 import java.io.IOException;
 import java.util.Collections;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.io.InputFile;
-import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.Writer;
 
 public class OrcMetrics {
 
-  private OrcMetrics() {}
+  private OrcMetrics() {
+  }
 
   public static Metrics fromInputFile(InputFile file) {
     final Configuration config = (file instanceof HadoopInputFile) ?
@@ -41,10 +40,8 @@ public class OrcMetrics {
     return fromInputFile(file, config);
   }
 
-  static Metrics fromInputFile(InputFile file, Configuration config) {
-    try {
-      final Reader orcReader = OrcFile.createReader(new Path(file.location()),
-          OrcFile.readerOptions(config));
+  public static Metrics fromInputFile(InputFile file, Configuration config) {
+    try (Reader orcReader = ORC.newFileReader(file, config)) {
 
       // TODO: implement rest of the methods for ORC metrics
       // https://github.com/apache/incubator-iceberg/pull/199