You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2020/03/07 21:22:15 UTC
[orc] branch master updated: ORC-602. Create adapter for reading
from a FSDataInputStream.
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new e063c16 ORC-602. Create adapter for reading from a FSDataInputStream.
e063c16 is described below
commit e063c1617ecaf18b69d1b103446d309ec247e04b
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Mon Feb 24 14:34:44 2020 -0800
ORC-602. Create adapter for reading from a FSDataInputStream.
Signed-off-by: Owen O'Malley <om...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../apache/orc/util/StreamWrapperFileSystem.java | 145 +++++++++++++++++++++
.../orc/util/TestStreamWrapperFileSystem.java | 85 ++++++++++++
2 files changed, 230 insertions(+)
diff --git a/java/core/src/java/org/apache/orc/util/StreamWrapperFileSystem.java b/java/core/src/java/org/apache/orc/util/StreamWrapperFileSystem.java
new file mode 100644
index 0000000..def194f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/util/StreamWrapperFileSystem.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.orc.OrcConf;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * This class provides an adaptor so that tools that want to read an ORC
+ * file from an FSDataInputStream can do so. Create an instance with the
+ * stream, path, and fileSize and pass it in to the reader as the FileSystem.
+ */
+public class StreamWrapperFileSystem extends FileSystem {
+ private final FSDataInputStream stream;
+ private final FileStatus status;
+
+ /**
+ * Create a FileSystem that only has information about the given stream.
+ * @param stream the data of the stream
+ * @param status the file status of the stream
+ * @param conf the configuration to use
+ */
+ public StreamWrapperFileSystem(FSDataInputStream stream,
+ FileStatus status,
+ Configuration conf) {
+ this.stream = stream;
+ this.status = status;
+ setConf(conf);
+ }
+
+ /**
+ * Create a FileSystem that only has information about the given stream.
+ * @param stream the data of the stream
+ * @param path the file name of the stream
+ * @param fileSize the length of the stream in bytes
+ * @param conf the configuration to use
+ */
+ public StreamWrapperFileSystem(FSDataInputStream stream,
+ Path path,
+ long fileSize,
+ Configuration conf) {
+ this(stream,
+ new FileStatus(fileSize, false, 1, OrcConf.BLOCK_SIZE.getInt(conf), 0, path),
+ conf);
+ }
+
+ @Override
+ public URI getUri() {
+ return URI.create("stream://" + status.getPath());
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+ if (status.getPath().equals(path)) {
+ return stream;
+ } else {
+ throw new FileNotFoundException(path.toString());
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission fsPermission,
+ boolean b, int i, short i1, long l,
+ Progressable progressable) {
+ throw new UnsupportedOperationException("Write operations on " +
+ getClass().getName());
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path, int i,
+ Progressable progressable) {
+ throw new UnsupportedOperationException("Write operations on " +
+ getClass().getName());
+ }
+
+ @Override
+ public boolean rename(Path path, Path path1) {
+ throw new UnsupportedOperationException("Write operations on " +
+ getClass().getName());
+ }
+
+ @Override
+ public boolean delete(Path path, boolean b) {
+ throw new UnsupportedOperationException("Write operations on " +
+ getClass().getName());
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {
+ throw new UnsupportedOperationException("Write operations on " +
+ getClass().getName());
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return status.getPath().getParent();
+ }
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) {
+ throw new UnsupportedOperationException("Write operations on " +
+ getClass().getName());
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ return new FileStatus[]{getFileStatus(path)};
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ if (status.getPath().equals(path)) {
+ return status;
+ } else {
+ throw new FileNotFoundException(path.toString());
+ }
+ }
+}
diff --git a/java/core/src/test/org/apache/orc/util/TestStreamWrapperFileSystem.java b/java/core/src/test/org/apache/orc/util/TestStreamWrapperFileSystem.java
new file mode 100644
index 0000000..00e7a59
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/util/TestStreamWrapperFileSystem.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TestVectorOrcFile;
+import org.apache.orc.TypeDescription;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for StreamWrapperFileSystem.
+ */
+public class TestStreamWrapperFileSystem {
+
+ @Test
+ public void testWrapper() throws IOException {
+ Configuration conf = new Configuration();
+ Path realFilename = new Path(TestVectorOrcFile.getFileFromClasspath(
+ "orc-file-11-format.orc"));
+ FileSystem local = FileSystem.getLocal(conf);
+ FSDataInputStream stream = local.open(realFilename);
+ long fileSize = local.getFileStatus(realFilename).getLen();
+ FileSystem fs = new StreamWrapperFileSystem(stream, new Path("foo"),
+ fileSize, conf);
+ assertSame(stream, fs.open(new Path("foo")));
+ TypeDescription readerSchema =
+ TypeDescription.fromString("struct<boolean1:boolean>");
+ try (Reader reader = OrcFile.createReader(new Path("foo"),
+ OrcFile.readerOptions(conf).filesystem(fs));
+ RecordReader rows = reader.rows(reader.options().schema(readerSchema))) {
+
+ // make sure that the metadata is what we expect
+ assertEquals(7500, reader.getNumberOfRows());
+ assertEquals("struct<boolean1:boolean,byte1:tinyint,short1:smallint," +
+ "int1:int,long1:bigint," +"float1:float,double1:double," +
+ "bytes1:binary,string1:string,middle:struct<list:array<struct<" +
+ "int1:int,string1:string>>>,list:array<struct<int1:int," +
+ "string1:string>>,map:map<string,struct<int1:int," +
+ "string1:string>>,ts:timestamp,decimal1:decimal(38,10)>",
+ reader.getSchema().toString());
+
+ // read the boolean1 column and check the data
+ VectorizedRowBatch batch = readerSchema.createRowBatchV2();
+ LongColumnVector boolean1 = (LongColumnVector) batch.cols[0];
+ int current = 0;
+ for(int r=0; r < 7500; ++r) {
+ if (current >= batch.size) {
+ assertTrue("row " + r, rows.nextBatch(batch));
+ current = 0;
+ }
+ assertEquals("row " + r, r % 2, boolean1.vector[current++]);
+ }
+ }
+ }
+}