You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/05/10 20:57:34 UTC
orc git commit: ORC-176: Refactor common classes for writer and reader
Repository: orc
Updated Branches:
refs/heads/master 90f138b06 -> 77ed66829
ORC-176: Refactor common classes for writer and reader
This is mainly a refactoring change for ORC-176, including:
1. Extracted common classes and functions into Common.hh and Common.cc;
2. Put InputStream interface and its implementations from Compression.hh to InputStream.hh and InputStream.cc.
Fixes #118
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/77ed6682
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/77ed6682
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/77ed6682
Branch: refs/heads/master
Commit: 77ed6682924462643ac8189adee8dae8eed1ae54
Parents: 90f138b
Author: Gang Wu <ga...@alibaba-inc.com>
Authored: Wed May 10 13:43:05 2017 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Wed May 10 13:56:56 2017 -0700
----------------------------------------------------------------------
c++/include/orc/Common.hh | 167 +++++++++++++++++++++++++++++++
c++/include/orc/Reader.hh | 137 +------------------------
c++/src/CMakeLists.txt | 2 +
c++/src/Common.cc | 107 ++++++++++++++++++++
c++/src/Compression.cc | 195 ------------------------------------
c++/src/Compression.hh | 92 +----------------
c++/src/Reader.cc | 82 ---------------
c++/src/io/InputStream.cc | 222 +++++++++++++++++++++++++++++++++++++++++
c++/src/io/InputStream.hh | 116 +++++++++++++++++++++
9 files changed, 616 insertions(+), 504 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/include/orc/Common.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Common.hh b/c++/include/orc/Common.hh
new file mode 100644
index 0000000..f499b81
--- /dev/null
+++ b/c++/include/orc/Common.hh
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_COMMON_HH
+#define ORC_COMMON_HH
+
+#include "orc/Vector.hh"
+#include "orc/Type.hh"
+#include "Exceptions.hh"
+#include "wrap/orc-proto-wrapper.hh"
+
+#include <string>
+
+namespace orc {
+ enum CompressionKind {
+ CompressionKind_NONE = 0,
+ CompressionKind_ZLIB = 1,
+ CompressionKind_SNAPPY = 2,
+ CompressionKind_LZO = 3,
+ CompressionKind_LZ4 = 4,
+ CompressionKind_ZSTD = 5,
+ CompressionKind_MAX = INT64_MAX
+ };
+
+ /**
+ * Get the name of the CompressionKind.
+ */
+ std::string compressionKindToString(CompressionKind kind);
+
+ enum WriterVersion {
+ WriterVersion_ORIGINAL = 0,
+ WriterVersion_HIVE_8732 = 1,
+ WriterVersion_HIVE_4243 = 2,
+ WriterVersion_HIVE_12055 = 3,
+ WriterVersion_HIVE_13083 = 4,
+ WriterVersion_ORC_101 = 5,
+ WriterVersion_ORC_135 = 6,
+ WriterVersion_MAX = INT64_MAX
+ };
+
+ /**
+ * Get the name of the WriterVersion.
+ */
+ std::string writerVersionToString(WriterVersion kind);
+
+ enum StreamKind {
+ StreamKind_PRESENT = 0,
+ StreamKind_DATA = 1,
+ StreamKind_LENGTH = 2,
+ StreamKind_DICTIONARY_DATA = 3,
+ StreamKind_DICTIONARY_COUNT = 4,
+ StreamKind_SECONDARY = 5,
+ StreamKind_ROW_INDEX = 6,
+ StreamKind_BLOOM_FILTER = 7
+ };
+
+ /**
+ * Get the string representation of the StreamKind.
+ */
+ std::string streamKindToString(StreamKind kind);
+
+ class StreamInformation {
+ public:
+ virtual ~StreamInformation();
+
+ virtual StreamKind getKind() const = 0;
+ virtual uint64_t getColumnId() const = 0;
+ virtual uint64_t getOffset() const = 0;
+ virtual uint64_t getLength() const = 0;
+ };
+
+ enum ColumnEncodingKind {
+ ColumnEncodingKind_DIRECT = 0,
+ ColumnEncodingKind_DICTIONARY = 1,
+ ColumnEncodingKind_DIRECT_V2 = 2,
+ ColumnEncodingKind_DICTIONARY_V2 = 3
+ };
+
+ std::string columnEncodingKindToString(ColumnEncodingKind kind);
+
+ class StripeInformation {
+ public:
+ virtual ~StripeInformation();
+
+ /**
+ * Get the byte offset of the start of the stripe.
+ * @return the bytes from the start of the file
+ */
+ virtual uint64_t getOffset() const = 0;
+
+ /**
+ * Get the total length of the stripe in bytes.
+ * @return the number of bytes in the stripe
+ */
+ virtual uint64_t getLength() const = 0;
+
+ /**
+ * Get the length of the stripe's indexes.
+ * @return the number of bytes in the index
+ */
+ virtual uint64_t getIndexLength() const = 0;
+
+ /**
+ * Get the length of the stripe's data.
+ * @return the number of bytes in the stripe
+ */
+ virtual uint64_t getDataLength()const = 0;
+
+ /**
+ * Get the length of the stripe's tail section, which contains its index.
+ * @return the number of bytes in the tail
+ */
+ virtual uint64_t getFooterLength() const = 0;
+
+ /**
+ * Get the number of rows in the stripe.
+ * @return a count of the number of rows
+ */
+ virtual uint64_t getNumberOfRows() const = 0;
+
+ /**
+ * Get the number of streams in the stripe.
+ */
+ virtual uint64_t getNumberOfStreams() const = 0;
+
+ /**
+ * Get the StreamInformation for the given stream.
+ */
+ virtual ORC_UNIQUE_PTR<StreamInformation>
+ getStreamInformation(uint64_t streamId) const = 0;
+
+ /**
+ * Get the column encoding for the given column.
+ * @param colId the columnId
+ */
+ virtual ColumnEncodingKind getColumnEncoding(uint64_t colId) const = 0;
+
+ /**
+ * Get the dictionary size.
+ * @param colId the columnId
+ * @return the size of the dictionary or 0 if there isn't one
+ */
+ virtual uint64_t getDictionarySize(uint64_t colId) const = 0;
+
+ /**
+ * Get the writer timezone.
+ */
+ virtual const std::string& getWriterTimezone() const = 0;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/include/orc/Reader.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 76d7853..3912bd7 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -19,6 +19,7 @@
#ifndef ORC_READER_HH
#define ORC_READER_HH
+#include "orc/Common.hh"
#include "orc/orc-config.hh"
#include "orc/Statistics.hh"
#include "orc/Type.hh"
@@ -34,142 +35,6 @@ namespace orc {
struct ReaderOptionsPrivate;
struct RowReaderOptionsPrivate;
- enum CompressionKind {
- CompressionKind_NONE = 0,
- CompressionKind_ZLIB = 1,
- CompressionKind_SNAPPY = 2,
- CompressionKind_LZO = 3,
- CompressionKind_LZ4 = 4,
- CompressionKind_ZSTD = 5,
- CompressionKind_MAX = INT64_MAX
- };
-
- /**
- * Get the name of the CompressionKind.
- */
- std::string compressionKindToString(CompressionKind kind);
-
- enum WriterVersion {
- WriterVersion_ORIGINAL = 0,
- WriterVersion_HIVE_8732 = 1,
- WriterVersion_HIVE_4243 = 2,
- WriterVersion_HIVE_12055 = 3,
- WriterVersion_HIVE_13083 = 4,
- WriterVersion_ORC_101 = 5,
- WriterVersion_ORC_135 = 6,
- WriterVersion_MAX = INT64_MAX
- };
-
- /**
- * Get the name of the WriterVersion.
- */
- std::string writerVersionToString(WriterVersion kind);
-
- enum StreamKind {
- StreamKind_PRESENT = 0,
- StreamKind_DATA = 1,
- StreamKind_LENGTH = 2,
- StreamKind_DICTIONARY_DATA = 3,
- StreamKind_DICTIONARY_COUNT = 4,
- StreamKind_SECONDARY = 5,
- StreamKind_ROW_INDEX = 6,
- StreamKind_BLOOM_FILTER = 7
- };
-
- /**
- * Get the string representation of the StreamKind.
- */
- std::string streamKindToString(StreamKind kind);
-
- class StreamInformation {
- public:
- virtual ~StreamInformation();
-
- virtual StreamKind getKind() const = 0;
- virtual uint64_t getColumnId() const = 0;
- virtual uint64_t getOffset() const = 0;
- virtual uint64_t getLength() const = 0;
- };
-
- enum ColumnEncodingKind {
- ColumnEncodingKind_DIRECT = 0,
- ColumnEncodingKind_DICTIONARY = 1,
- ColumnEncodingKind_DIRECT_V2 = 2,
- ColumnEncodingKind_DICTIONARY_V2 = 3
- };
-
- std::string columnEncodingKindToString(ColumnEncodingKind kind);
-
- class StripeInformation {
- public:
- virtual ~StripeInformation();
-
- /**
- * Get the byte offset of the start of the stripe.
- * @return the bytes from the start of the file
- */
- virtual uint64_t getOffset() const = 0;
-
- /**
- * Get the total length of the stripe in bytes.
- * @return the number of bytes in the stripe
- */
- virtual uint64_t getLength() const = 0;
-
- /**
- * Get the length of the stripe's indexes.
- * @return the number of bytes in the index
- */
- virtual uint64_t getIndexLength() const = 0;
-
- /**
- * Get the length of the stripe's data.
- * @return the number of bytes in the stripe
- */
- virtual uint64_t getDataLength()const = 0;
-
- /**
- * Get the length of the stripe's tail section, which contains its index.
- * @return the number of bytes in the tail
- */
- virtual uint64_t getFooterLength() const = 0;
-
- /**
- * Get the number of rows in the stripe.
- * @return a count of the number of rows
- */
- virtual uint64_t getNumberOfRows() const = 0;
-
- /**
- * Get the number of streams in the stripe.
- */
- virtual uint64_t getNumberOfStreams() const = 0;
-
- /**
- * Get the StreamInformation for the given stream.
- */
- virtual ORC_UNIQUE_PTR<StreamInformation>
- getStreamInformation(uint64_t streamId) const = 0;
-
- /**
- * Get the column encoding for the given column.
- * @param colId the columnId
- */
- virtual ColumnEncodingKind getColumnEncoding(uint64_t colId) const = 0;
-
- /**
- * Get the dictionary size.
- * @param colId the columnId
- * @return the size of the dictionary or 0 if there isn't one
- */
- virtual uint64_t getDictionarySize(uint64_t colId) const = 0;
-
- /**
- * Get the writer timezone.
- */
- virtual const std::string& getWriterTimezone() const = 0;
- };
-
/**
* Options for creating a Reader.
*/
http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index d717fb4..c39437a 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -135,10 +135,12 @@ add_custom_command(OUTPUT orc_proto.pb.h orc_proto.pb.cc
add_library (orc STATIC
"${CMAKE_CURRENT_BINARY_DIR}/Adaptor.hh"
orc_proto.pb.h
+ io/InputStream.cc
wrap/orc-proto-wrapper.cc
ByteRLE.cc
ColumnPrinter.cc
ColumnReader.cc
+ Common.cc
Compression.cc
Exceptions.cc
Int128.cc
http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/Common.cc
----------------------------------------------------------------------
diff --git a/c++/src/Common.cc b/c++/src/Common.cc
new file mode 100644
index 0000000..7813612
--- /dev/null
+++ b/c++/src/Common.cc
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "orc/Common.hh"
+
+#include <sstream>
+
+namespace orc {
+
+ std::string compressionKindToString(CompressionKind kind) {
+ switch (static_cast<int>(kind)) {
+ case CompressionKind_NONE:
+ return "none";
+ case CompressionKind_ZLIB:
+ return "zlib";
+ case CompressionKind_SNAPPY:
+ return "snappy";
+ case CompressionKind_LZO:
+ return "lzo";
+ case CompressionKind_LZ4:
+ return "lz4";
+ case CompressionKind_ZSTD:
+ return "zstd";
+ }
+ std::stringstream buffer;
+ buffer << "unknown - " << kind;
+ return buffer.str();
+ }
+
+ std::string writerVersionToString(WriterVersion version) {
+ switch (static_cast<int>(version)) {
+ case WriterVersion_ORIGINAL:
+ return "original";
+ case WriterVersion_HIVE_8732:
+ return "HIVE-8732";
+ case WriterVersion_HIVE_4243:
+ return "HIVE-4243";
+ case WriterVersion_HIVE_12055:
+ return "HIVE-12055";
+ case WriterVersion_HIVE_13083:
+ return "HIVE-13083";
+ case WriterVersion_ORC_101:
+ return "ORC-101";
+ case WriterVersion_ORC_135:
+ return "ORC-135";
+ }
+ std::stringstream buffer;
+ buffer << "future - " << version;
+ return buffer.str();
+ }
+
+ std::string streamKindToString(StreamKind kind) {
+ switch (static_cast<int>(kind)) {
+ case StreamKind_PRESENT:
+ return "present";
+ case StreamKind_DATA:
+ return "data";
+ case StreamKind_LENGTH:
+ return "length";
+ case StreamKind_DICTIONARY_DATA:
+ return "dictionary";
+ case StreamKind_DICTIONARY_COUNT:
+ return "dictionary count";
+ case StreamKind_SECONDARY:
+ return "secondary";
+ case StreamKind_ROW_INDEX:
+ return "index";
+ case StreamKind_BLOOM_FILTER:
+ return "bloom";
+ }
+ std::stringstream buffer;
+ buffer << "unknown - " << kind;
+ return buffer.str();
+ }
+
+ std::string columnEncodingKindToString(ColumnEncodingKind kind) {
+ switch (static_cast<int>(kind)) {
+ case ColumnEncodingKind_DIRECT:
+ return "direct";
+ case ColumnEncodingKind_DICTIONARY:
+ return "dictionary";
+ case ColumnEncodingKind_DIRECT_V2:
+ return "direct rle2";
+ case ColumnEncodingKind_DICTIONARY_V2:
+ return "dictionary rle2";
+ }
+ std::stringstream buffer;
+ buffer << "unknown - " << kind;
+ return buffer.str();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/Compression.cc
----------------------------------------------------------------------
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 81cc578..e2f9dbf 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -33,201 +33,6 @@
namespace orc {
- void printBuffer(std::ostream& out,
- const char *buffer,
- uint64_t length) {
- const uint64_t width = 24;
- out << std::hex;
- for(uint64_t line = 0; line < (length + width - 1) / width; ++line) {
- out << std::setfill('0') << std::setw(7) << (line * width);
- for(uint64_t byte = 0;
- byte < width && line * width + byte < length; ++byte) {
- out << " " << std::setfill('0') << std::setw(2)
- << static_cast<uint64_t>(0xff & buffer[line * width +
- byte]);
- }
- out << "\n";
- }
- out << std::dec;
- }
-
- PositionProvider::PositionProvider(const std::list<uint64_t>& posns) {
- position = posns.begin();
- }
-
- uint64_t PositionProvider::next() {
- uint64_t result = *position;
- ++position;
- return result;
- }
-
- SeekableInputStream::~SeekableInputStream() {
- // PASS
- }
-
- SeekableArrayInputStream::~SeekableArrayInputStream() {
- // PASS
- }
-
- SeekableArrayInputStream::SeekableArrayInputStream
- (const unsigned char* values,
- uint64_t size,
- uint64_t blkSize
- ): data(reinterpret_cast<const char*>(values)) {
- length = size;
- position = 0;
- blockSize = blkSize == 0 ? length : static_cast<uint64_t>(blkSize);
- }
-
- SeekableArrayInputStream::SeekableArrayInputStream(const char* values,
- uint64_t size,
- uint64_t blkSize
- ): data(values) {
- length = size;
- position = 0;
- blockSize = blkSize == 0 ? length : static_cast<uint64_t>(blkSize);
- }
-
- bool SeekableArrayInputStream::Next(const void** buffer, int*size) {
- uint64_t currentSize = std::min(length - position, blockSize);
- if (currentSize > 0) {
- *buffer = data + position;
- *size = static_cast<int>(currentSize);
- position += currentSize;
- return true;
- }
- *size = 0;
- return false;
- }
-
- void SeekableArrayInputStream::BackUp(int count) {
- if (count >= 0) {
- uint64_t unsignedCount = static_cast<uint64_t>(count);
- if (unsignedCount <= blockSize && unsignedCount <= position) {
- position -= unsignedCount;
- } else {
- throw std::logic_error("Can't backup that much!");
- }
- }
- }
-
- bool SeekableArrayInputStream::Skip(int count) {
- if (count >= 0) {
- uint64_t unsignedCount = static_cast<uint64_t>(count);
- if (unsignedCount + position <= length) {
- position += unsignedCount;
- return true;
- } else {
- position = length;
- }
- }
- return false;
- }
-
- google::protobuf::int64 SeekableArrayInputStream::ByteCount() const {
- return static_cast<google::protobuf::int64>(position);
- }
-
- void SeekableArrayInputStream::seek(PositionProvider& seekPosition) {
- position = seekPosition.next();
- }
-
- std::string SeekableArrayInputStream::getName() const {
- std::ostringstream result;
- result << "SeekableArrayInputStream " << position << " of " << length;
- return result.str();
- }
-
- static uint64_t computeBlock(uint64_t request, uint64_t length) {
- return std::min(length, request == 0 ? 256 * 1024 : request);
- }
-
- SeekableFileInputStream::SeekableFileInputStream(InputStream* stream,
- uint64_t offset,
- uint64_t byteCount,
- MemoryPool& _pool,
- uint64_t _blockSize
- ):pool(_pool),
- input(stream),
- start(offset),
- length(byteCount),
- blockSize(computeBlock
- (_blockSize,
- length)) {
-
- position = 0;
- buffer.reset(new DataBuffer<char>(pool));
- pushBack = 0;
- }
-
- SeekableFileInputStream::~SeekableFileInputStream() {
- // PASS
- }
-
- bool SeekableFileInputStream::Next(const void** data, int*size) {
- uint64_t bytesRead;
- if (pushBack != 0) {
- *data = buffer->data() + (buffer->size() - pushBack);
- bytesRead = pushBack;
- } else {
- bytesRead = std::min(length - position, blockSize);
- buffer->resize(bytesRead);
- if (bytesRead > 0) {
- input->read(buffer->data(), bytesRead, start+position);
- *data = static_cast<void*>(buffer->data());
- }
- }
- position += bytesRead;
- pushBack = 0;
- *size = static_cast<int>(bytesRead);
- return bytesRead != 0;
- }
-
- void SeekableFileInputStream::BackUp(int signedCount) {
- if (signedCount < 0) {
- throw std::logic_error("can't backup negative distances");
- }
- uint64_t count = static_cast<uint64_t>(signedCount);
- if (pushBack > 0) {
- throw std::logic_error("can't backup unless we just called Next");
- }
- if (count > blockSize || count > position) {
- throw std::logic_error("can't backup that far");
- }
- pushBack = static_cast<uint64_t>(count);
- position -= pushBack;
- }
-
- bool SeekableFileInputStream::Skip(int signedCount) {
- if (signedCount < 0) {
- return false;
- }
- uint64_t count = static_cast<uint64_t>(signedCount);
- position = std::min(position + count, length);
- pushBack = 0;
- return position < length;
- }
-
- int64_t SeekableFileInputStream::ByteCount() const {
- return static_cast<int64_t>(position);
- }
-
- void SeekableFileInputStream::seek(PositionProvider& location) {
- position = location.next();
- if (position > length) {
- position = length;
- throw std::logic_error("seek too far");
- }
- pushBack = 0;
- }
-
- std::string SeekableFileInputStream::getName() const {
- std::ostringstream result;
- result << input->getName() << " from " << start << " for "
- << length;
- return result.str();
- }
-
enum DecompressState { DECOMPRESS_HEADER,
DECOMPRESS_START,
DECOMPRESS_CONTINUE,
http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/Compression.hh
----------------------------------------------------------------------
diff --git a/c++/src/Compression.hh b/c++/src/Compression.hh
index efd374a..8c3eda7 100644
--- a/c++/src/Compression.hh
+++ b/c++/src/Compression.hh
@@ -19,100 +19,10 @@
#ifndef ORC_COMPRESSION_HH
#define ORC_COMPRESSION_HH
-#include "orc/OrcFile.hh"
-
-#include "Adaptor.hh"
-#include "wrap/zero-copy-stream-wrapper.h"
-
-#include <list>
-#include <vector>
-#include <fstream>
-#include <iostream>
-#include <sstream>
-#include <memory>
+#include "io/InputStream.hh"
namespace orc {
- void printBuffer(std::ostream& out,
- const char *buffer,
- uint64_t length);
-
- class PositionProvider {
- private:
- std::list<uint64_t>::const_iterator position;
- public:
- PositionProvider(const std::list<uint64_t>& positions);
- uint64_t next();
- };
-
- /**
- * A subclass of Google's ZeroCopyInputStream that supports seek.
- * By extending Google's class, we get the ability to pass it directly
- * to the protobuf readers.
- */
- class SeekableInputStream: public google::protobuf::io::ZeroCopyInputStream {
- public:
- virtual ~SeekableInputStream();
- virtual void seek(PositionProvider& position) = 0;
- virtual std::string getName() const = 0;
- };
-
- /**
- * Create a seekable input stream based on a memory range.
- */
- class SeekableArrayInputStream: public SeekableInputStream {
- private:
- const char* data;
- uint64_t length;
- uint64_t position;
- uint64_t blockSize;
-
- public:
- SeekableArrayInputStream(const unsigned char* list,
- uint64_t length,
- uint64_t block_size = 0);
- SeekableArrayInputStream(const char* list,
- uint64_t length,
- uint64_t block_size = 0);
- virtual ~SeekableArrayInputStream();
- virtual bool Next(const void** data, int*size) override;
- virtual void BackUp(int count) override;
- virtual bool Skip(int count) override;
- virtual google::protobuf::int64 ByteCount() const override;
- virtual void seek(PositionProvider& position) override;
- virtual std::string getName() const override;
- };
-
- /**
- * Create a seekable input stream based on an input stream.
- */
- class SeekableFileInputStream: public SeekableInputStream {
- private:
- MemoryPool& pool;
- InputStream* const input;
- const uint64_t start;
- const uint64_t length;
- const uint64_t blockSize;
- std::unique_ptr<DataBuffer<char> > buffer;
- uint64_t position;
- uint64_t pushBack;
-
- public:
- SeekableFileInputStream(InputStream* input,
- uint64_t offset,
- uint64_t byteCount,
- MemoryPool& pool,
- uint64_t blockSize = 0);
- virtual ~SeekableFileInputStream();
-
- virtual bool Next(const void** data, int*size) override;
- virtual void BackUp(int count) override;
- virtual bool Skip(int count) override;
- virtual int64_t ByteCount() const override;
- virtual void seek(PositionProvider& position) override;
- virtual std::string getName() const override;
- };
-
/**
* Create a decompressor for the given compression kind.
* @param kind the compression type to implement
http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/Reader.cc
----------------------------------------------------------------------
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 9c423bd..fe8608a 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -35,48 +35,6 @@
namespace orc {
- std::string compressionKindToString(CompressionKind kind) {
- switch (static_cast<int>(kind)) {
- case CompressionKind_NONE:
- return "none";
- case CompressionKind_ZLIB:
- return "zlib";
- case CompressionKind_SNAPPY:
- return "snappy";
- case CompressionKind_LZO:
- return "lzo";
- case CompressionKind_LZ4:
- return "lz4";
- case CompressionKind_ZSTD:
- return "zstd";
- }
- std::stringstream buffer;
- buffer << "unknown - " << kind;
- return buffer.str();
- }
-
- std::string writerVersionToString(WriterVersion version) {
- switch (static_cast<int>(version)) {
- case WriterVersion_ORIGINAL:
- return "original";
- case WriterVersion_HIVE_8732:
- return "HIVE-8732";
- case WriterVersion_HIVE_4243:
- return "HIVE-4243";
- case WriterVersion_HIVE_12055:
- return "HIVE-12055";
- case WriterVersion_HIVE_13083:
- return "HIVE-13083";
- case WriterVersion_ORC_101:
- return "ORC-101";
- case WriterVersion_ORC_135:
- return "ORC-135";
- }
- std::stringstream buffer;
- buffer << "future - " << version;
- return buffer.str();
- }
-
uint64_t getCompressionBlockSize(const proto::PostScript& ps) {
if (ps.has_compressionblocksize()) {
return ps.compressionblocksize();
@@ -961,46 +919,6 @@ namespace orc {
postscriptLength));
}
- std::string streamKindToString(StreamKind kind) {
- switch (static_cast<int>(kind)) {
- case StreamKind_PRESENT:
- return "present";
- case StreamKind_DATA:
- return "data";
- case StreamKind_LENGTH:
- return "length";
- case StreamKind_DICTIONARY_DATA:
- return "dictionary";
- case StreamKind_DICTIONARY_COUNT:
- return "dictionary count";
- case StreamKind_SECONDARY:
- return "secondary";
- case StreamKind_ROW_INDEX:
- return "index";
- case StreamKind_BLOOM_FILTER:
- return "bloom";
- }
- std::stringstream buffer;
- buffer << "unknown - " << kind;
- return buffer.str();
- }
-
- std::string columnEncodingKindToString(ColumnEncodingKind kind) {
- switch (static_cast<int>(kind)) {
- case ColumnEncodingKind_DIRECT:
- return "direct";
- case ColumnEncodingKind_DICTIONARY:
- return "dictionary";
- case ColumnEncodingKind_DIRECT_V2:
- return "direct rle2";
- case ColumnEncodingKind_DICTIONARY_V2:
- return "dictionary rle2";
- }
- std::stringstream buffer;
- buffer << "unknown - " << kind;
- return buffer.str();
- }
-
RowReader::~RowReader() {
// PASS
}
http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/io/InputStream.cc
----------------------------------------------------------------------
diff --git a/c++/src/io/InputStream.cc b/c++/src/io/InputStream.cc
new file mode 100644
index 0000000..fd91b23
--- /dev/null
+++ b/c++/src/io/InputStream.cc
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exceptions.hh"
+#include "InputStream.hh"
+
+#include <algorithm>
+#include <iomanip>
+
+namespace orc {
+
+ void printBuffer(std::ostream& out,
+ const char *buffer,
+ uint64_t length) {
+ const uint64_t width = 24;
+ out << std::hex;
+ for(uint64_t line = 0; line < (length + width - 1) / width; ++line) {
+ out << std::setfill('0') << std::setw(7) << (line * width);
+ for(uint64_t byte = 0;
+ byte < width && line * width + byte < length; ++byte) {
+ out << " " << std::setfill('0') << std::setw(2)
+ << static_cast<uint64_t>(0xff & buffer[line * width +
+ byte]);
+ }
+ out << "\n";
+ }
+ out << std::dec;
+ }
+
+ PositionProvider::PositionProvider(const std::list<uint64_t>& posns) {
+ position = posns.begin();
+ }
+
+ uint64_t PositionProvider::next() {
+ uint64_t result = *position;
+ ++position;
+ return result;
+ }
+
+ SeekableInputStream::~SeekableInputStream() {
+ // PASS
+ }
+
+ SeekableArrayInputStream::~SeekableArrayInputStream() {
+ // PASS
+ }
+
+ SeekableArrayInputStream::SeekableArrayInputStream
+ (const unsigned char* values,
+ uint64_t size,
+ uint64_t blkSize
+ ): data(reinterpret_cast<const char*>(values)) {
+ length = size;
+ position = 0;
+ blockSize = blkSize == 0 ? length : static_cast<uint64_t>(blkSize);
+ }
+
+ SeekableArrayInputStream::SeekableArrayInputStream(const char* values,
+ uint64_t size,
+ uint64_t blkSize
+ ): data(values) {
+ length = size;
+ position = 0;
+ blockSize = blkSize == 0 ? length : static_cast<uint64_t>(blkSize);
+ }
+
+ bool SeekableArrayInputStream::Next(const void** buffer, int*size) {
+ uint64_t currentSize = std::min(length - position, blockSize);
+ if (currentSize > 0) {
+ *buffer = data + position;
+ *size = static_cast<int>(currentSize);
+ position += currentSize;
+ return true;
+ }
+ *size = 0;
+ return false;
+ }
+
+ void SeekableArrayInputStream::BackUp(int count) {
+ if (count >= 0) {
+ uint64_t unsignedCount = static_cast<uint64_t>(count);
+ if (unsignedCount <= blockSize && unsignedCount <= position) {
+ position -= unsignedCount;
+ } else {
+ throw std::logic_error("Can't backup that much!");
+ }
+ }
+ }
+
+ bool SeekableArrayInputStream::Skip(int count) {
+ if (count >= 0) {
+ uint64_t unsignedCount = static_cast<uint64_t>(count);
+ if (unsignedCount + position <= length) {
+ position += unsignedCount;
+ return true;
+ } else {
+ position = length;
+ }
+ }
+ return false;
+ }
+
+ google::protobuf::int64 SeekableArrayInputStream::ByteCount() const {
+ return static_cast<google::protobuf::int64>(position);
+ }
+
+ void SeekableArrayInputStream::seek(PositionProvider& seekPosition) {
+ position = seekPosition.next();
+ }
+
+ std::string SeekableArrayInputStream::getName() const {
+ std::ostringstream result;
+ result << "SeekableArrayInputStream " << position << " of " << length;
+ return result.str();
+ }
+
+ static uint64_t computeBlock(uint64_t request, uint64_t length) {
+ return std::min(length, request == 0 ? 256 * 1024 : request);
+ }
+
+ SeekableFileInputStream::SeekableFileInputStream(InputStream* stream,
+ uint64_t offset,
+ uint64_t byteCount,
+ MemoryPool& _pool,
+ uint64_t _blockSize
+ ):pool(_pool),
+ input(stream),
+ start(offset),
+ length(byteCount),
+ blockSize(computeBlock
+ (_blockSize,
+ length)) {
+
+ position = 0;
+ buffer.reset(new DataBuffer<char>(pool));
+ pushBack = 0;
+ }
+
+ SeekableFileInputStream::~SeekableFileInputStream() {
+ // PASS
+ }
+
+ bool SeekableFileInputStream::Next(const void** data, int*size) {
+ uint64_t bytesRead;
+ if (pushBack != 0) {
+ *data = buffer->data() + (buffer->size() - pushBack);
+ bytesRead = pushBack;
+ } else {
+ bytesRead = std::min(length - position, blockSize);
+ buffer->resize(bytesRead);
+ if (bytesRead > 0) {
+ input->read(buffer->data(), bytesRead, start+position);
+ *data = static_cast<void*>(buffer->data());
+ }
+ }
+ position += bytesRead;
+ pushBack = 0;
+ *size = static_cast<int>(bytesRead);
+ return bytesRead != 0;
+ }
+
+ void SeekableFileInputStream::BackUp(int signedCount) {
+ if (signedCount < 0) {
+ throw std::logic_error("can't backup negative distances");
+ }
+ uint64_t count = static_cast<uint64_t>(signedCount);
+ if (pushBack > 0) {
+ throw std::logic_error("can't backup unless we just called Next");
+ }
+ if (count > blockSize || count > position) {
+ throw std::logic_error("can't backup that far");
+ }
+ pushBack = static_cast<uint64_t>(count);
+ position -= pushBack;
+ }
+
+ bool SeekableFileInputStream::Skip(int signedCount) {
+ if (signedCount < 0) {
+ return false;
+ }
+ uint64_t count = static_cast<uint64_t>(signedCount);
+ position = std::min(position + count, length);
+ pushBack = 0;
+ return position < length;
+ }
+
+ int64_t SeekableFileInputStream::ByteCount() const {
+ return static_cast<int64_t>(position);
+ }
+
+ void SeekableFileInputStream::seek(PositionProvider& location) {
+ position = location.next();
+ if (position > length) {
+ position = length;
+ throw std::logic_error("seek too far");
+ }
+ pushBack = 0;
+ }
+
+ std::string SeekableFileInputStream::getName() const {
+ std::ostringstream result;
+ result << input->getName() << " from " << start << " for "
+ << length;
+ return result.str();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/io/InputStream.hh
----------------------------------------------------------------------
diff --git a/c++/src/io/InputStream.hh b/c++/src/io/InputStream.hh
new file mode 100644
index 0000000..5ea8c7e
--- /dev/null
+++ b/c++/src/io/InputStream.hh
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_INPUTSTREAM_HH
+#define ORC_INPUTSTREAM_HH
+
+#include "Adaptor.hh"
+#include "orc/OrcFile.hh"
+#include "wrap/zero-copy-stream-wrapper.h"
+
+#include <list>
+#include <fstream>
+#include <iostream>
+#include <sstream>
+#include <vector>
+
+namespace orc {
+
+ void printBuffer(std::ostream& out,
+ const char *buffer,
+ uint64_t length);
+
+ class PositionProvider {
+ private:
+ std::list<uint64_t>::const_iterator position;
+ public:
+ PositionProvider(const std::list<uint64_t>& positions);
+ uint64_t next();
+ };
+
+ /**
+ * A subclass of Google's ZeroCopyInputStream that supports seek.
+ * By extending Google's class, we get the ability to pass it directly
+ * to the protobuf readers.
+ */
+ class SeekableInputStream: public google::protobuf::io::ZeroCopyInputStream {
+ public:
+ virtual ~SeekableInputStream();
+ virtual void seek(PositionProvider& position) = 0;
+ virtual std::string getName() const = 0;
+ };
+
+ /**
+ * Create a seekable input stream based on a memory range.
+ */
+ class SeekableArrayInputStream: public SeekableInputStream {
+ private:
+ const char* data;
+ uint64_t length;
+ uint64_t position;
+ uint64_t blockSize;
+
+ public:
+ SeekableArrayInputStream(const unsigned char* list,
+ uint64_t length,
+ uint64_t block_size = 0);
+ SeekableArrayInputStream(const char* list,
+ uint64_t length,
+ uint64_t block_size = 0);
+ virtual ~SeekableArrayInputStream();
+ virtual bool Next(const void** data, int*size) override;
+ virtual void BackUp(int count) override;
+ virtual bool Skip(int count) override;
+ virtual google::protobuf::int64 ByteCount() const override;
+ virtual void seek(PositionProvider& position) override;
+ virtual std::string getName() const override;
+ };
+
+ /**
+ * Create a seekable input stream based on an input stream.
+ */
+ class SeekableFileInputStream: public SeekableInputStream {
+ private:
+ MemoryPool& pool;
+ InputStream* const input;
+ const uint64_t start;
+ const uint64_t length;
+ const uint64_t blockSize;
+ std::unique_ptr<DataBuffer<char> > buffer;
+ uint64_t position;
+ uint64_t pushBack;
+
+ public:
+ SeekableFileInputStream(InputStream* input,
+ uint64_t offset,
+ uint64_t byteCount,
+ MemoryPool& pool,
+ uint64_t blockSize = 0);
+ virtual ~SeekableFileInputStream();
+
+ virtual bool Next(const void** data, int*size) override;
+ virtual void BackUp(int count) override;
+ virtual bool Skip(int count) override;
+ virtual int64_t ByteCount() const override;
+ virtual void seek(PositionProvider& position) override;
+ virtual std::string getName() const override;
+ };
+
+}
+
+#endif //ORC_INPUTSTREAM_HH