You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/12/07 00:01:13 UTC

[GitHub] [iceberg] rdblue commented on a diff in pull request #6293: Added FileIO Support for ORC Reader and Writers

rdblue commented on code in PR #6293:
URL: https://github.com/apache/iceberg/pull/6293#discussion_r1041600523


##########
orc/src/main/java/org/apache/iceberg/orc/ORC.java:
##########
@@ -789,7 +808,210 @@ static Reader newFileReader(InputFile file, Configuration config) {
     ReaderOptions readerOptions = OrcFile.readerOptions(config).useUTCTimestamp(true);
     if (file instanceof HadoopInputFile) {
       readerOptions.filesystem(((HadoopInputFile) file).getFileSystem());
+    } else {
+      readerOptions.filesystem(new InputFileSystem(file)).maxLength(file.getLength());
     }
     return newFileReader(file.location(), readerOptions);
   }
+
+  static Writer newFileWriter(
+      OutputFile file, OrcFile.WriterOptions options, Map<String, byte[]> metadata) {
+    if (file instanceof HadoopOutputFile) {
+      options.fileSystem(((HadoopOutputFile) file).getFileSystem());
+    } else {
+      options.fileSystem(new OutputFileSystem(file));
+    }
+    final Path locPath = new Path(file.location());
+    final Writer writer;
+
+    try {
+      writer = OrcFile.createWriter(locPath, options);
+    } catch (IOException ioe) {
+      throw new RuntimeIOException(ioe, "Can't create file %s", locPath);
+    }
+
+    metadata.forEach((key, value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value)));
+
+    return writer;
+  }
+
+  private static class WrappedSeekableInputStream extends FSInputStream {
+    private final SeekableInputStream inputStream;
+    private boolean closed;
+    private final StackTraceElement[] createStack;
+
+    private WrappedSeekableInputStream(SeekableInputStream inputStream) {
+      this.inputStream = inputStream;
+      this.createStack = Thread.currentThread().getStackTrace();
+      this.closed = false;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      inputStream.seek(pos);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return inputStream.getPos();
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      throw new UnsupportedOperationException("seekToNewSource not supported");
+    }
+
+    @Override
+    public int read() throws IOException {
+      return inputStream.read();
+    }
+
+    @Override
+    public int read(@NotNull byte[] b, int off, int len) throws IOException {
+      return inputStream.read(b, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+      inputStream.close();
+      closed = true;
+    }
+
+    @SuppressWarnings("checkstyle:NoFinalizer")
+    @Override
+    protected void finalize() throws Throwable {
+      super.finalize();
+      if (!closed) {
+        close(); // releasing resources is more important than printing the warning
+        String trace =
+            Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
+        LOG.warn("Unclosed input stream created by:\n\t{}", trace);
+      }
+    }
+  }
+
+  private static class NullFileSystem extends FileSystem {
+
+    @Override
+    public URI getUri() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataInputStream open(Path f) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataOutputStream create(
+        Path f,
+        FsPermission permission,
+        boolean overwrite,
+        int bufferSize,
+        short replication,
+        long blockSize,
+        Progressable progress)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean delete(Path f, boolean recursive) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setWorkingDirectory(Path new_dir) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  static class InputFileSystem extends NullFileSystem {
+    private final InputFile inputFile;
+    private final Path inputPath;
+
+    InputFileSystem(InputFile inputFile) {
+      this.inputFile = inputFile;
+      this.inputPath = new Path(inputFile.location());
+    }
+
+    @Override
+    public FSDataInputStream open(Path f) throws IOException {
+      return open(f, 0);

Review Comment:
   Rather than making up a fake buffer size, I think it would be better to call this method from `open(Path, int)` and discard the buffer size that's passed into that one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org