You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2020/03/07 21:22:15 UTC

[orc] branch master updated: ORC-602. Create adapter for reading from a FSDataInputStream.

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new e063c16  ORC-602. Create adapter for reading from a FSDataInputStream.
e063c16 is described below

commit e063c1617ecaf18b69d1b103446d309ec247e04b
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Mon Feb 24 14:34:44 2020 -0800

    ORC-602. Create adapter for reading from a FSDataInputStream.
    
    Signed-off-by: Owen O'Malley <om...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/orc/util/StreamWrapperFileSystem.java   | 145 +++++++++++++++++++++
 .../orc/util/TestStreamWrapperFileSystem.java      |  85 ++++++++++++
 2 files changed, 230 insertions(+)

diff --git a/java/core/src/java/org/apache/orc/util/StreamWrapperFileSystem.java b/java/core/src/java/org/apache/orc/util/StreamWrapperFileSystem.java
new file mode 100644
index 0000000..def194f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/util/StreamWrapperFileSystem.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.orc.OrcConf;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * This class provides an adaptor so that tools that want to read an ORC
+ * file from an FSDataInputStream can do so. Create an instance with the
+ * stream, path, and fileSize and pass it in to the reader as the FileSystem.
+ */
+public class StreamWrapperFileSystem extends FileSystem {
+  private final FSDataInputStream stream;
+  private final FileStatus status;
+
+  /**
+   * Create a FileSystem that only has information about the given stream.
+   * @param stream the data of the stream
+   * @param status the file status of the stream
+   * @param conf the configuration to use
+   */
+  public StreamWrapperFileSystem(FSDataInputStream stream,
+                                 FileStatus status,
+                                 Configuration conf) {
+    this.stream = stream;
+    this.status = status;
+    setConf(conf);
+  }
+
+  /**
+   * Create a FileSystem that only has information about the given stream.
+   * @param stream the data of the stream
+   * @param path the file name of the stream
+   * @param fileSize the length of the stream in bytes
+   * @param conf the configuration to use
+   */
+  public StreamWrapperFileSystem(FSDataInputStream stream,
+                                 Path path,
+                                 long fileSize,
+                                 Configuration conf) {
+   this(stream,
+       new FileStatus(fileSize, false, 1, OrcConf.BLOCK_SIZE.getInt(conf), 0, path),
+       conf);
+  }
+
+  @Override
+  public URI getUri() {
+    return URI.create("stream://" + status.getPath());
+  }
+
+  @Override
+  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+    if (status.getPath().equals(path)) {
+      return stream;
+    } else {
+      throw new FileNotFoundException(path.toString());
+    }
+  }
+
+  @Override
+  public FSDataOutputStream create(Path path, FsPermission fsPermission,
+                                   boolean b, int i, short i1, long l,
+                                   Progressable progressable) {
+    throw new UnsupportedOperationException("Write operations on " +
+        getClass().getName());
+  }
+
+  @Override
+  public FSDataOutputStream append(Path path, int i,
+                                   Progressable progressable) {
+    throw new UnsupportedOperationException("Write operations on " +
+        getClass().getName());
+  }
+
+  @Override
+  public boolean rename(Path path, Path path1) {
+    throw new UnsupportedOperationException("Write operations on " +
+        getClass().getName());
+  }
+
+  @Override
+  public boolean delete(Path path, boolean b) {
+    throw new UnsupportedOperationException("Write operations on " +
+        getClass().getName());
+  }
+
+  @Override
+  public void setWorkingDirectory(Path path) {
+    throw new UnsupportedOperationException("Write operations on " +
+        getClass().getName());
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return status.getPath().getParent();
+  }
+
+  @Override
+  public boolean mkdirs(Path path, FsPermission fsPermission) {
+    throw new UnsupportedOperationException("Write operations on " +
+        getClass().getName());
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path path) throws IOException {
+    return new FileStatus[]{getFileStatus(path)};
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path path) throws IOException {
+    if (status.getPath().equals(path)) {
+      return status;
+    } else {
+      throw new FileNotFoundException(path.toString());
+    }
+  }
+}
diff --git a/java/core/src/test/org/apache/orc/util/TestStreamWrapperFileSystem.java b/java/core/src/test/org/apache/orc/util/TestStreamWrapperFileSystem.java
new file mode 100644
index 0000000..00e7a59
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/util/TestStreamWrapperFileSystem.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TestVectorOrcFile;
+import org.apache.orc.TypeDescription;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for StreamWrapperFileSystem.
+ */
+public class TestStreamWrapperFileSystem {
+
+  @Test
+  public void testWrapper() throws IOException {
+    Configuration conf = new Configuration();
+    Path realFilename = new Path(TestVectorOrcFile.getFileFromClasspath(
+        "orc-file-11-format.orc"));
+    FileSystem local = FileSystem.getLocal(conf);
+    FSDataInputStream stream = local.open(realFilename);
+    long fileSize = local.getFileStatus(realFilename).getLen();
+    FileSystem fs = new StreamWrapperFileSystem(stream, new Path("foo"),
+        fileSize, conf);
+    assertSame(stream, fs.open(new Path("foo")));
+    TypeDescription readerSchema =
+        TypeDescription.fromString("struct<boolean1:boolean>");
+    try (Reader reader = OrcFile.createReader(new Path("foo"),
+            OrcFile.readerOptions(conf).filesystem(fs));
+        RecordReader rows = reader.rows(reader.options().schema(readerSchema))) {
+
+      // make sure that the metadata is what we expect
+      assertEquals(7500, reader.getNumberOfRows());
+      assertEquals("struct<boolean1:boolean,byte1:tinyint,short1:smallint," +
+              "int1:int,long1:bigint," +"float1:float,double1:double," +
+              "bytes1:binary,string1:string,middle:struct<list:array<struct<" +
+              "int1:int,string1:string>>>,list:array<struct<int1:int," +
+              "string1:string>>,map:map<string,struct<int1:int," +
+              "string1:string>>,ts:timestamp,decimal1:decimal(38,10)>",
+          reader.getSchema().toString());
+
+      // read the boolean1 column and check the data
+      VectorizedRowBatch batch = readerSchema.createRowBatchV2();
+      LongColumnVector boolean1 = (LongColumnVector) batch.cols[0];
+      int current = 0;
+      for(int r=0; r < 7500; ++r) {
+        if (current >= batch.size) {
+          assertTrue("row " + r, rows.nextBatch(batch));
+          current = 0;
+        }
+        assertEquals("row " + r, r % 2, boolean1.vector[current++]);
+      }
+    }
+  }
+}