You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/03/02 21:58:13 UTC
[incubator-iceberg] branch master updated: Use HadoopInputFile and
HadoopOutputFile FileSystem for ORC (#823)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7676318 Use HadoopInputFile and HadoopOutputFile FileSystem for ORC (#823)
7676318 is described below
commit 76763188c529b6e1e5b08200539b69fd05aee44a
Author: Vlad Rozov <vr...@users.noreply.github.com>
AuthorDate: Mon Mar 2 13:58:01 2020 -0800
Use HadoopInputFile and HadoopOutputFile FileSystem for ORC (#823)
Also, fix a resource leak.
---
.../org/apache/iceberg/hadoop/HadoopInputFile.java | 4 ++++
.../apache/iceberg/hadoop/HadoopOutputFile.java | 4 ++++
orc/src/main/java/org/apache/iceberg/orc/ORC.java | 22 ++++++++++++++++++++++
.../org/apache/iceberg/orc/OrcFileAppender.java | 18 +++++++++---------
.../java/org/apache/iceberg/orc/OrcIterable.java | 14 ++------------
.../java/org/apache/iceberg/orc/OrcMetrics.java | 11 ++++-------
6 files changed, 45 insertions(+), 28 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
index ee03b9b..84c5bf2 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
@@ -161,6 +161,10 @@ public class HadoopInputFile implements InputFile {
return conf;
}
+ public FileSystem getFileSystem() {
+ return fs;
+ }
+
public FileStatus getStat() {
return lazyStat();
}
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java
index b95cfff..921a762 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java
@@ -96,6 +96,10 @@ public class HadoopOutputFile implements OutputFile {
return conf;
}
+ public FileSystem getFileSystem() {
+ return fs;
+ }
+
@Override
public String location() {
return path.toString();
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 6ffaf05..fc88fa9 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -20,12 +20,15 @@
package org.apache.iceberg.orc;
import com.google.common.base.Preconditions;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.CloseableIterable;
@@ -33,6 +36,9 @@ import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFile.ReaderOptions;
+import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
@@ -167,4 +173,20 @@ public class ORC {
return new OrcIterable<>(file, conf, schema, start, length, readerFunc);
}
}
+
+ static Reader newFileReader(String location, ReaderOptions readerOptions) {
+ try {
+ return OrcFile.createReader(new Path(location), readerOptions);
+ } catch (IOException ioe) {
+ throw new RuntimeIOException(ioe, "Failed to open file: %s", location);
+ }
+ }
+
+ static Reader newFileReader(InputFile file, Configuration config) {
+ ReaderOptions readerOptions = OrcFile.readerOptions(config);
+ if (file instanceof HadoopInputFile) {
+ readerOptions.filesystem(((HadoopInputFile) file).getFileSystem());
+ }
+ return newFileReader(file.location(), readerOptions);
+ }
}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
index 3f67496..99e3f51 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.orc.OrcFile;
@@ -67,6 +68,9 @@ class OrcFileAppender<D> implements FileAppender<D> {
this.batch = orcSchema.createRowBatch(this.batchSize);
OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
+ if (file instanceof HadoopOutputFile) {
+ options.fileSystem(((HadoopOutputFile) file).getFileSystem());
+ }
options.setSchema(orcSchema);
this.writer = newOrcWriter(file, options, metadata);
this.valueWriter = newOrcValueWriter(orcSchema, createWriterFunc);
@@ -100,16 +104,12 @@ class OrcFileAppender<D> implements FileAppender<D> {
@Override
public List<Long> splitOffsets() {
Preconditions.checkState(isClosed, "File is not yet closed");
- String fileLoc = file.location();
- Reader reader;
- try {
- reader = OrcFile.createReader(new Path(fileLoc), new OrcFile.ReaderOptions(conf));
- } catch (IOException ioe) {
- throw new RuntimeIOException(ioe, "Cannot read file " + fileLoc);
+ try (Reader reader = ORC.newFileReader(file.toInputFile(), conf)) {
+ List<StripeInformation> stripes = reader.getStripes();
+ return Collections.unmodifiableList(Lists.transform(stripes, StripeInformation::getOffset));
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Can't close ORC reader %s", file.location());
}
-
- List<StripeInformation> stripes = reader.getStripes();
- return Collections.unmodifiableList(Lists.transform(stripes, StripeInformation::getOffset));
}
@Override
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
index 1c9d4ca..814c5bc 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
@@ -23,13 +23,11 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
-import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
@@ -59,7 +57,8 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
@SuppressWarnings("unchecked")
@Override
public Iterator<T> iterator() {
- Reader orcFileReader = newFileReader(file, config);
+ Reader orcFileReader = ORC.newFileReader(file, config);
+ addCloseable(orcFileReader);
TypeDescription readOrcSchema = ORCSchemaUtil.buildOrcProjection(schema, orcFileReader.getSchema());
return new OrcIterator(
@@ -84,15 +83,6 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
}
}
- private static Reader newFileReader(InputFile file, Configuration config) {
- try {
- return OrcFile.createReader(new Path(file.location()),
- OrcFile.readerOptions(config));
- } catch (IOException ioe) {
- throw new RuntimeIOException(ioe, "Failed to open file: %s", file);
- }
- }
-
private static class OrcIterator<T> implements Iterator<T> {
private int nextRow;
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
index 65bc64d..3689ec5 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
@@ -22,18 +22,17 @@ package org.apache.iceberg.orc;
import java.io.IOException;
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
-import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.Writer;
public class OrcMetrics {
- private OrcMetrics() {}
+ private OrcMetrics() {
+ }
public static Metrics fromInputFile(InputFile file) {
final Configuration config = (file instanceof HadoopInputFile) ?
@@ -41,10 +40,8 @@ public class OrcMetrics {
return fromInputFile(file, config);
}
- static Metrics fromInputFile(InputFile file, Configuration config) {
- try {
- final Reader orcReader = OrcFile.createReader(new Path(file.location()),
- OrcFile.readerOptions(config));
+ public static Metrics fromInputFile(InputFile file, Configuration config) {
+ try (Reader orcReader = ORC.newFileReader(file, config)) {
// TODO: implement rest of the methods for ORC metrics
// https://github.com/apache/incubator-iceberg/pull/199