You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2015/07/06 23:52:47 UTC
[18/23] orc git commit: ORC-23. Simplify directory structure.
http://git-wip-us.apache.org/repos/asf/orc/blob/7f55b453/c++/src/orc/ColumnReader.cc
----------------------------------------------------------------------
diff --git a/c++/src/orc/ColumnReader.cc b/c++/src/orc/ColumnReader.cc
deleted file mode 100644
index 0b6a9cb..0000000
--- a/c++/src/orc/ColumnReader.cc
+++ /dev/null
@@ -1,1557 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "orc/Adaptor.hh"
-#include "ByteRLE.hh"
-#include "ColumnReader.hh"
-#include "Exceptions.hh"
-#include "orc/Int128.hh"
-#include "RLE.hh"
-
-#include <math.h>
-#include <iostream>
-
-namespace orc {
-
- StripeStreams::~StripeStreams() {
- // PASS
- }
-
- inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) {
- switch (static_cast<int64_t>(kind)) {
- case proto::ColumnEncoding_Kind_DIRECT:
- case proto::ColumnEncoding_Kind_DICTIONARY:
- return RleVersion_1;
- case proto::ColumnEncoding_Kind_DIRECT_V2:
- case proto::ColumnEncoding_Kind_DICTIONARY_V2:
- return RleVersion_2;
- default:
- throw ParseError("Unknown encoding in convertRleVersion");
- }
- }
-
- ColumnReader::ColumnReader(const Type& type,
- StripeStreams& stripe
- ): columnId(type.getColumnId()),
- memoryPool(stripe.getMemoryPool()) {
- std::unique_ptr<SeekableInputStream> stream =
- stripe.getStream(columnId, proto::Stream_Kind_PRESENT, true);
- if (stream.get()) {
- notNullDecoder = createBooleanRleDecoder(std::move(stream));
- }
- }
-
- ColumnReader::~ColumnReader() {
- // PASS
- }
-
- uint64_t ColumnReader::skip(uint64_t numValues) {
- ByteRleDecoder* decoder = notNullDecoder.get();
- if (decoder) {
- // page through the values that we want to skip
- // and count how many are non-null
- const size_t MAX_BUFFER_SIZE = 32768;
- size_t bufferSize = std::min(MAX_BUFFER_SIZE,
- static_cast<size_t>(numValues));
- char buffer[MAX_BUFFER_SIZE];
- uint64_t remaining = numValues;
- while (remaining > 0) {
- uint64_t chunkSize =
- std::min(remaining,
- static_cast<uint64_t>(bufferSize));
- decoder->next(buffer, chunkSize, 0);
- remaining -= chunkSize;
- for(uint64_t i=0; i < chunkSize; ++i) {
- if (!buffer[i]) {
- numValues -= 1;
- }
- }
- }
- }
- return numValues;
- }
-
- void ColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char* incomingMask) {
- if (numValues > rowBatch.capacity) {
- rowBatch.resize(numValues);
- }
- rowBatch.numElements = numValues;
- ByteRleDecoder* decoder = notNullDecoder.get();
- if (decoder) {
- char* notNullArray = rowBatch.notNull.data();
- decoder->next(notNullArray, numValues, incomingMask);
- // check to see if there are nulls in this batch
- for(uint64_t i=0; i < numValues; ++i) {
- if (!notNullArray[i]) {
- rowBatch.hasNulls = true;
- return;
- }
- }
- } else if (incomingMask) {
- // If we don't have a notNull stream, copy the incomingMask
- rowBatch.hasNulls = true;
- memcpy(rowBatch.notNull.data(), incomingMask, numValues);
- return;
- }
- rowBatch.hasNulls = false;
- }
-
- /**
- * Expand an array of bytes in place to the corresponding array of longs.
- * Has to work backwards so that they data isn't clobbered during the
- * expansion.
- * @param buffer the array of chars and array of longs that need to be
- * expanded
- * @param numValues the number of bytes to convert to longs
- */
- void expandBytesToLongs(int64_t* buffer, uint64_t numValues) {
- for(size_t i=numValues - 1; i < numValues; --i) {
- buffer[i] = reinterpret_cast<char *>(buffer)[i];
- }
- }
-
- class BooleanColumnReader: public ColumnReader {
- private:
- std::unique_ptr<orc::ByteRleDecoder> rle;
-
- public:
- BooleanColumnReader(const Type& type, StripeStreams& stipe);
- ~BooleanColumnReader();
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char* notNull) override;
- };
-
- BooleanColumnReader::BooleanColumnReader(const Type& type,
- StripeStreams& stripe
- ): ColumnReader(type, stripe){
- rle = createBooleanRleDecoder(stripe.getStream(columnId,
- proto::Stream_Kind_DATA,
- true));
- }
-
- BooleanColumnReader::~BooleanColumnReader() {
- // PASS
- }
-
- uint64_t BooleanColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
- rle->skip(numValues);
- return numValues;
- }
-
- void BooleanColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- // Since the byte rle places the output in a char* instead of long*,
- // we cheat here and use the long* and then expand it in a second pass.
- int64_t *ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
- rle->next(reinterpret_cast<char*>(ptr),
- numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : 0);
- expandBytesToLongs(ptr, numValues);
- }
-
- class ByteColumnReader: public ColumnReader {
- private:
- std::unique_ptr<orc::ByteRleDecoder> rle;
-
- public:
- ByteColumnReader(const Type& type, StripeStreams& stipe);
- ~ByteColumnReader();
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char* notNull) override;
- };
-
- ByteColumnReader::ByteColumnReader(const Type& type,
- StripeStreams& stripe
- ): ColumnReader(type, stripe){
- rle = createByteRleDecoder(stripe.getStream(columnId,
- proto::Stream_Kind_DATA,
- true));
- }
-
- ByteColumnReader::~ByteColumnReader() {
- // PASS
- }
-
- uint64_t ByteColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
- rle->skip(numValues);
- return numValues;
- }
-
- void ByteColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- // Since the byte rle places the output in a char* instead of long*,
- // we cheat here and use the long* and then expand it in a second pass.
- int64_t *ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
- rle->next(reinterpret_cast<char*>(ptr),
- numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : 0);
- expandBytesToLongs(ptr, numValues);
- }
-
- class IntegerColumnReader: public ColumnReader {
- protected:
- std::unique_ptr<orc::RleDecoder> rle;
-
- public:
- IntegerColumnReader(const Type& type, StripeStreams& stripe);
- ~IntegerColumnReader();
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char* notNull) override;
- };
-
- IntegerColumnReader::IntegerColumnReader(const Type& type,
- StripeStreams& stripe
- ): ColumnReader(type, stripe) {
- RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
- rle = createRleDecoder(stripe.getStream(columnId,
- proto::Stream_Kind_DATA,
- true),
- true, vers, memoryPool);
- }
-
- IntegerColumnReader::~IntegerColumnReader() {
- // PASS
- }
-
- uint64_t IntegerColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
- rle->skip(numValues);
- return numValues;
- }
-
- void IntegerColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- rle->next(dynamic_cast<LongVectorBatch&>(rowBatch).data.data(),
- numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : 0);
- }
-
- class TimestampColumnReader: public IntegerColumnReader {
- private:
- std::unique_ptr<orc::RleDecoder> nanoRle;
- DataBuffer<int64_t> nanoBuffer;
-
- public:
- TimestampColumnReader(const Type& type, StripeStreams& stripe);
- ~TimestampColumnReader();
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char* notNull) override;
- };
-
-
- TimestampColumnReader::TimestampColumnReader(const Type& type,
- StripeStreams& stripe
- ): IntegerColumnReader(type,
- stripe),
- nanoBuffer(memoryPool, 1024){
- RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
- nanoRle = createRleDecoder(stripe.getStream(columnId,
- proto::Stream_Kind_SECONDARY,
- true),
- false, vers, memoryPool);
- }
-
- TimestampColumnReader::~TimestampColumnReader() {
- // PASS
- }
-
- uint64_t TimestampColumnReader::skip(uint64_t numValues) {
- numValues = IntegerColumnReader::skip(numValues);
- nanoRle->skip(numValues);
- return numValues;
- }
-
- void TimestampColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
- int64_t* pStamp = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
-
- // make sure that nanoBuffer is large enough
- if (numValues > nanoBuffer.size()) {
- nanoBuffer.resize(numValues);
- }
-
- rle->next(pStamp, numValues, notNull);
- nanoRle->next(nanoBuffer.data(), numValues, notNull);
-
- // Construct the values
- for(uint64_t i=0; i < numValues; i++) {
- if (notNull == nullptr || notNull[i]) {
- int64_t nanosec = nanoBuffer[i] >> 3;
- uint64_t zeros = nanoBuffer[i] & 0x7;
- if (zeros != 0) {
- for(uint64_t j = 0; j <= zeros; ++j) {
- nanosec *= 10;
- }
- }
- pStamp[i] = pStamp[i] * 1000000000 + 1420070400000000000;
- if (pStamp[i] >= 0) {
- pStamp[i] += nanosec;
- } else {
- pStamp[i] -= nanosec;
- }
- }
- }
- }
-
- class DoubleColumnReader: public ColumnReader {
- public:
- DoubleColumnReader(const Type& type, StripeStreams& stripe);
- ~DoubleColumnReader();
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char* notNull) override;
-
- private:
- std::unique_ptr<SeekableInputStream> inputStream;
- TypeKind columnKind;
- const uint64_t bytesPerValue ;
- const char *bufferPointer;
- const char *bufferEnd;
-
- unsigned char readByte() {
- if (bufferPointer == bufferEnd) {
- int length;
- if (!inputStream->Next
- (reinterpret_cast<const void**>(&bufferPointer), &length)) {
- throw ParseError("bad read in DoubleColumnReader::next()");
- }
- bufferEnd = bufferPointer + length;
- }
- return static_cast<unsigned char>(*(bufferPointer++));
- }
-
- double readDouble() {
- int64_t bits = 0;
- for (uint64_t i=0; i < 8; i++) {
- bits |= static_cast<int64_t>(readByte()) << (i*8);
- }
- double *result = reinterpret_cast<double*>(&bits);
- return *result;
- }
-
- double readFloat() {
- int32_t bits = 0;
- for (uint64_t i=0; i < 4; i++) {
- bits |= readByte() << (i*8);
- }
- float *result = reinterpret_cast<float*>(&bits);
- return *result;
- }
- };
-
- DoubleColumnReader::DoubleColumnReader(const Type& type,
- StripeStreams& stripe
- ): ColumnReader(type, stripe),
- inputStream
- (stripe.getStream
- (columnId,
- proto::Stream_Kind_DATA,
- true)),
- columnKind(type.getKind()),
- bytesPerValue((type.getKind() ==
- FLOAT) ? 4 : 8),
- bufferPointer(NULL),
- bufferEnd(NULL) {
- // PASS
- }
-
- DoubleColumnReader::~DoubleColumnReader() {
- // PASS
- }
-
- uint64_t DoubleColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
-
- if (static_cast<size_t>(bufferEnd - bufferPointer) >=
- bytesPerValue * numValues) {
- bufferPointer+= bytesPerValue*numValues;
- } else {
- inputStream->Skip(static_cast<int>(bytesPerValue*numValues -
- static_cast<size_t>(bufferEnd -
- bufferPointer)));
- bufferEnd = NULL;
- bufferPointer = NULL;
- }
-
- return numValues;
- }
-
- void DoubleColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- // update the notNull from the parent class
- notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : 0;
- double* outArray = dynamic_cast<DoubleVectorBatch&>(rowBatch).data.data();
-
- if (columnKind == FLOAT) {
- if (notNull) {
- for(size_t i=0; i < numValues; ++i) {
- if (notNull[i]) {
- outArray[i] = readFloat();
- }
- }
- } else {
- for(size_t i=0; i < numValues; ++i) {
- outArray[i] = readFloat();
- }
- }
- } else {
- if (notNull) {
- for(size_t i=0; i < numValues; ++i) {
- if (notNull[i]) {
- outArray[i] = readDouble();
- }
- }
- } else {
- for(size_t i=0; i < numValues; ++i) {
- outArray[i] = readDouble();
- }
- }
- }
- }
-
- void readFully(char* buffer, int64_t bufferSize, SeekableInputStream* stream) {
- int64_t posn = 0;
- while (posn < bufferSize) {
- const void* chunk;
- int length;
- if (!stream->Next(&chunk, &length)) {
- throw ParseError("bad read in readFully");
- }
- memcpy(buffer + posn, chunk, static_cast<size_t>(length));
- posn += length;
- }
- }
-
- class StringDictionaryColumnReader: public ColumnReader {
- private:
- DataBuffer<char> dictionaryBlob;
- DataBuffer<int64_t> dictionaryOffset;
- std::unique_ptr<RleDecoder> rle;
- uint64_t dictionaryCount;
-
- public:
- StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
- ~StringDictionaryColumnReader();
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) override;
- };
-
- StringDictionaryColumnReader::StringDictionaryColumnReader
- (const Type& type,
- StripeStreams& stripe
- ): ColumnReader(type, stripe),
- dictionaryBlob(stripe.getMemoryPool()),
- dictionaryOffset(stripe.getMemoryPool()) {
- RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
- .kind());
- dictionaryCount = stripe.getEncoding(columnId).dictionarysize();
- rle = createRleDecoder(stripe.getStream(columnId,
- proto::Stream_Kind_DATA,
- true),
- false, rleVersion, memoryPool);
- std::unique_ptr<RleDecoder> lengthDecoder =
- createRleDecoder(stripe.getStream(columnId,
- proto::Stream_Kind_LENGTH,
- false),
- false, rleVersion, memoryPool);
- dictionaryOffset.resize(dictionaryCount+1);
- int64_t* lengthArray = dictionaryOffset.data();
- lengthDecoder->next(lengthArray + 1, dictionaryCount, 0);
- lengthArray[0] = 0;
- for(uint64_t i=1; i < dictionaryCount + 1; ++i) {
- lengthArray[i] += lengthArray[i-1];
- }
- int64_t blobSize = lengthArray[dictionaryCount];
- dictionaryBlob.resize(static_cast<uint64_t>(blobSize));
- std::unique_ptr<SeekableInputStream> blobStream =
- stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
- readFully(dictionaryBlob.data(), blobSize, blobStream.get());
- }
-
- StringDictionaryColumnReader::~StringDictionaryColumnReader() {
- // PASS
- }
-
- uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
- rle->skip(numValues);
- return numValues;
- }
-
- void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- // update the notNull from the parent class
- notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : 0;
- StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
- char *blob = dictionaryBlob.data();
- int64_t *dictionaryOffsets = dictionaryOffset.data();
- char **outputStarts = byteBatch.data.data();
- int64_t *outputLengths = byteBatch.length.data();
- rle->next(outputLengths, numValues, notNull);
- if (notNull) {
- for(uint64_t i=0; i < numValues; ++i) {
- if (notNull[i]) {
- int64_t entry = outputLengths[i];
- outputStarts[i] = blob + dictionaryOffsets[entry];
- outputLengths[i] = dictionaryOffsets[entry+1] -
- dictionaryOffsets[entry];
- }
- }
- } else {
- for(uint64_t i=0; i < numValues; ++i) {
- int64_t entry = outputLengths[i];
- outputStarts[i] = blob + dictionaryOffsets[entry];
- outputLengths[i] = dictionaryOffsets[entry+1] -
- dictionaryOffsets[entry];
- }
- }
- }
-
- class StringDirectColumnReader: public ColumnReader {
- private:
- DataBuffer<char> blobBuffer;
- std::unique_ptr<RleDecoder> lengthRle;
- std::unique_ptr<SeekableInputStream> blobStream;
- const char *lastBuffer;
- size_t lastBufferLength;
-
- /**
- * Compute the total length of the values.
- * @param lengths the array of lengths
- * @param notNull the array of notNull flags
- * @param numValues the lengths of the arrays
- * @return the total number of bytes for the non-null values
- */
- size_t computeSize(const int64_t *lengths, const char *notNull,
- uint64_t numValues);
-
- public:
- StringDirectColumnReader(const Type& type, StripeStreams& stipe);
- ~StringDirectColumnReader();
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) override;
- };
-
- StringDirectColumnReader::StringDirectColumnReader
- (const Type& type,
- StripeStreams& stripe
- ): ColumnReader(type, stripe),
- blobBuffer(stripe.getMemoryPool()) {
- RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
- .kind());
- lengthRle = createRleDecoder(stripe.getStream(columnId,
- proto::Stream_Kind_LENGTH,
- true),
- false, rleVersion, memoryPool);
- blobStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
- lastBuffer = 0;
- lastBufferLength = 0;
- }
-
- StringDirectColumnReader::~StringDirectColumnReader() {
- // PASS
- }
-
- uint64_t StringDirectColumnReader::skip(uint64_t numValues) {
- const size_t BUFFER_SIZE = 1024;
- numValues = ColumnReader::skip(numValues);
- int64_t buffer[BUFFER_SIZE];
- uint64_t done = 0;
- size_t totalBytes = 0;
- // read the lengths, so we know haw many bytes to skip
- while (done < numValues) {
- uint64_t step = std::min(BUFFER_SIZE,
- static_cast<size_t>(numValues - done));
- lengthRle->next(buffer, step, 0);
- totalBytes += computeSize(buffer, 0, step);
- done += step;
- }
- if (totalBytes <= lastBufferLength) {
- // subtract the needed bytes from the ones left over
- lastBufferLength -= totalBytes;
- lastBuffer += totalBytes;
- } else {
- // move the stream forward after accounting for the buffered bytes
- totalBytes -= lastBufferLength;
- blobStream->Skip(static_cast<int>(totalBytes));
- lastBufferLength = 0;
- lastBuffer = 0;
- }
- return numValues;
- }
-
- size_t StringDirectColumnReader::computeSize(const int64_t* lengths,
- const char* notNull,
- uint64_t numValues) {
- size_t totalLength = 0;
- if (notNull) {
- for(size_t i=0; i < numValues; ++i) {
- if (notNull[i]) {
- totalLength += static_cast<size_t>(lengths[i]);
- }
- }
- } else {
- for(size_t i=0; i < numValues; ++i) {
- totalLength += static_cast<size_t>(lengths[i]);
- }
- }
- return totalLength;
- }
-
- void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- // update the notNull from the parent class
- notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : 0;
- StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
- char **startPtr = byteBatch.data.data();
- int64_t *lengthPtr = byteBatch.length.data();
-
- // read the length vector
- lengthRle->next(lengthPtr, numValues, notNull);
-
- // figure out the total length of data we need from the blob stream
- const size_t totalLength = computeSize(lengthPtr, notNull, numValues);
-
- // Load data from the blob stream into our buffer until we have enough
- // to get the rest directly out of the stream's buffer.
- size_t bytesBuffered = 0;
- blobBuffer.resize(totalLength);
- char *ptr= blobBuffer.data();
- while (bytesBuffered + lastBufferLength < totalLength) {
- blobBuffer.resize(bytesBuffered + lastBufferLength);
- memcpy(ptr + bytesBuffered, lastBuffer, lastBufferLength);
- bytesBuffered += lastBufferLength;
- const void* readBuffer;
- int readLength;
- if (!blobStream->Next(&readBuffer, &readLength)) {
- throw ParseError("failed to read in StringDirectColumnReader.next");
- }
- lastBuffer = static_cast<const char*>(readBuffer);
- lastBufferLength = static_cast<size_t>(readLength);
- }
-
- // Set up the start pointers for the ones that will come out of the buffer.
- size_t filledSlots = 0;
- size_t usedBytes = 0;
- ptr = blobBuffer.data();
- if (notNull) {
- while (filledSlots < numValues &&
- (usedBytes + static_cast<size_t>(lengthPtr[filledSlots]) <=
- bytesBuffered)) {
- if (notNull[filledSlots]) {
- startPtr[filledSlots] = ptr + usedBytes;
- usedBytes += static_cast<size_t>(lengthPtr[filledSlots]);
- }
- filledSlots += 1;
- }
- } else {
- while (filledSlots < numValues &&
- (usedBytes + static_cast<size_t>(lengthPtr[filledSlots]) <=
- bytesBuffered)) {
- startPtr[filledSlots] = ptr + usedBytes;
- usedBytes += static_cast<size_t>(lengthPtr[filledSlots]);
- filledSlots += 1;
- }
- }
-
- // do we need to complete the last value in the blob buffer?
- if (usedBytes < bytesBuffered) {
- size_t moreBytes = static_cast<size_t>(lengthPtr[filledSlots]) -
- (bytesBuffered - usedBytes);
- blobBuffer.resize(bytesBuffered + moreBytes);
- ptr = blobBuffer.data();
- memcpy(ptr + bytesBuffered, lastBuffer, moreBytes);
- lastBuffer += moreBytes;
- lastBufferLength -= moreBytes;
- startPtr[filledSlots++] = ptr + usedBytes;
- }
-
- // Finally, set up any remaining entries into the stream buffer
- if (notNull) {
- while (filledSlots < numValues) {
- if (notNull[filledSlots]) {
- startPtr[filledSlots] = const_cast<char*>(lastBuffer);
- lastBuffer += lengthPtr[filledSlots];
- lastBufferLength -= static_cast<size_t>(lengthPtr[filledSlots]);
- }
- filledSlots += 1;
- }
- } else {
- while (filledSlots < numValues) {
- startPtr[filledSlots] = const_cast<char*>(lastBuffer);
- lastBuffer += lengthPtr[filledSlots];
- lastBufferLength -= static_cast<size_t>(lengthPtr[filledSlots]);
- filledSlots += 1;
- }
- }
- }
-
- class StructColumnReader: public ColumnReader {
- private:
- std::vector<ColumnReader*> children;
-
- public:
- StructColumnReader(const Type& type, StripeStreams& stipe);
- ~StructColumnReader();
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) override;
- };
-
- StructColumnReader::StructColumnReader(const Type& type,
- StripeStreams& stripe
- ): ColumnReader(type, stripe) {
- // count the number of selected sub-columns
- const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
- switch (static_cast<int64_t>(stripe.getEncoding(columnId).kind())) {
- case proto::ColumnEncoding_Kind_DIRECT:
- for(unsigned int i=0; i < type.getSubtypeCount(); ++i) {
- const Type& child = type.getSubtype(i);
- if (selectedColumns[static_cast<uint64_t>(child.getColumnId())]) {
- children.push_back(buildReader(child, stripe).release());
- }
- }
- break;
- case proto::ColumnEncoding_Kind_DIRECT_V2:
- case proto::ColumnEncoding_Kind_DICTIONARY:
- case proto::ColumnEncoding_Kind_DICTIONARY_V2:
- default:
- throw ParseError("Unknown encoding for StructColumnReader");
- }
- }
-
- StructColumnReader::~StructColumnReader() {
- for (size_t i=0; i<children.size(); i++) {
- delete children[i];
- }
- }
-
- uint64_t StructColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
- for(std::vector<ColumnReader*>::iterator ptr=children.begin(); ptr != children.end(); ++ptr) {
- (*ptr)->skip(numValues);
- }
- return numValues;
- }
-
- void StructColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- uint64_t i=0;
- notNull = rowBatch.hasNulls? rowBatch.notNull.data() : 0;
- for(std::vector<ColumnReader*>::iterator ptr=children.begin();
- ptr != children.end(); ++ptr, ++i) {
- (*ptr)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
- numValues, notNull);
- }
- }
-
- class ListColumnReader: public ColumnReader {
- private:
- std::unique_ptr<ColumnReader> child;
- std::unique_ptr<RleDecoder> rle;
-
- public:
- ListColumnReader(const Type& type, StripeStreams& stipe);
- ~ListColumnReader();
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) override;
- };
-
- ListColumnReader::ListColumnReader(const Type& type,
- StripeStreams& stripe
- ): ColumnReader(type, stripe) {
- // count the number of selected sub-columns
- const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
- RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
- rle = createRleDecoder(stripe.getStream(columnId,
- proto::Stream_Kind_LENGTH,
- true),
- false, vers, memoryPool);
- const Type& childType = type.getSubtype(0);
- if (selectedColumns[static_cast<uint64_t>(childType.getColumnId())]) {
- child = buildReader(childType, stripe);
- }
- }
-
- ListColumnReader::~ListColumnReader() {
- // PASS
- }
-
- uint64_t ListColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
- ColumnReader *childReader = child.get();
- if (childReader) {
- const uint64_t BUFFER_SIZE = 1024;
- int64_t buffer[BUFFER_SIZE];
- uint64_t childrenElements = 0;
- uint64_t lengthsRead = 0;
- while (lengthsRead < numValues) {
- uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
- rle->next(buffer, chunk, 0);
- for(size_t i=0; i < chunk; ++i) {
- childrenElements += static_cast<size_t>(buffer[i]);
- }
- lengthsRead += chunk;
- }
- childReader->skip(childrenElements);
- } else {
- rle->skip(numValues);
- }
- return numValues;
- }
-
- void ListColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- ListVectorBatch &listBatch = dynamic_cast<ListVectorBatch&>(rowBatch);
- int64_t* offsets = listBatch.offsets.data();
- notNull = listBatch.hasNulls ? listBatch.notNull.data() : 0;
- rle->next(offsets, numValues, notNull);
- uint64_t totalChildren = 0;
- if (notNull) {
- for(size_t i=0; i < numValues; ++i) {
- if (notNull[i]) {
- uint64_t tmp = static_cast<uint64_t>(offsets[i]);
- offsets[i] = static_cast<int64_t>(totalChildren);
- totalChildren += tmp;
- } else {
- offsets[i] = static_cast<int64_t>(totalChildren);
- }
- }
- } else {
- for(size_t i=0; i < numValues; ++i) {
- uint64_t tmp = static_cast<uint64_t>(offsets[i]);
- offsets[i] = static_cast<int64_t>(totalChildren);
- totalChildren += tmp;
- }
- }
- offsets[numValues] = static_cast<int64_t>(totalChildren);
- ColumnReader *childReader = child.get();
- if (childReader) {
- childReader->next(*(listBatch.elements.get()), totalChildren, 0);
- }
- }
-
- class MapColumnReader: public ColumnReader {
- private:
- std::unique_ptr<ColumnReader> keyReader;
- std::unique_ptr<ColumnReader> elementReader;
- std::unique_ptr<RleDecoder> rle;
-
- public:
- MapColumnReader(const Type& type, StripeStreams& stipe);
- ~MapColumnReader();
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) override;
- };
-
- MapColumnReader::MapColumnReader(const Type& type,
- StripeStreams& stripe
- ): ColumnReader(type, stripe) {
- // Determine if the key and/or value columns are selected
- const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
- RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
- rle = createRleDecoder(stripe.getStream(columnId,
- proto::Stream_Kind_LENGTH,
- true),
- false, vers, memoryPool);
- const Type& keyType = type.getSubtype(0);
- if (selectedColumns[static_cast<uint64_t>(keyType.getColumnId())]) {
- keyReader = buildReader(keyType, stripe);
- }
- const Type& elementType = type.getSubtype(1);
- if (selectedColumns[static_cast<uint64_t>(elementType.getColumnId())]) {
- elementReader = buildReader(elementType, stripe);
- }
- }
-
- MapColumnReader::~MapColumnReader() {
- // PASS
- }
-
- uint64_t MapColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
- ColumnReader *rawKeyReader = keyReader.get();
- ColumnReader *rawElementReader = elementReader.get();
- if (rawKeyReader || rawElementReader) {
- const uint64_t BUFFER_SIZE = 1024;
- int64_t buffer[BUFFER_SIZE];
- uint64_t childrenElements = 0;
- uint64_t lengthsRead = 0;
- while (lengthsRead < numValues) {
- uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
- rle->next(buffer, chunk, 0);
- for(size_t i=0; i < chunk; ++i) {
- childrenElements += static_cast<size_t>(buffer[i]);
- }
- lengthsRead += chunk;
- }
- if (rawKeyReader) {
- rawKeyReader->skip(childrenElements);
- }
- if (rawElementReader) {
- rawElementReader->skip(childrenElements);
- }
- } else {
- rle->skip(numValues);
- }
- return numValues;
- }
-
- void MapColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- MapVectorBatch &mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch);
- int64_t* offsets = mapBatch.offsets.data();
- notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : 0;
- rle->next(offsets, numValues, notNull);
- uint64_t totalChildren = 0;
- if (notNull) {
- for(size_t i=0; i < numValues; ++i) {
- if (notNull[i]) {
- uint64_t tmp = static_cast<uint64_t>(offsets[i]);
- offsets[i] = static_cast<int64_t>(totalChildren);
- totalChildren += tmp;
- } else {
- offsets[i] = static_cast<int64_t>(totalChildren);
- }
- }
- } else {
- for(size_t i=0; i < numValues; ++i) {
- uint64_t tmp = static_cast<uint64_t>(offsets[i]);
- offsets[i] = static_cast<int64_t>(totalChildren);
- totalChildren += tmp;
- }
- }
- offsets[numValues] = static_cast<int64_t>(totalChildren);
- ColumnReader *rawKeyReader = keyReader.get();
- if (rawKeyReader) {
- rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, 0);
- }
- ColumnReader *rawElementReader = elementReader.get();
- if (rawElementReader) {
- rawElementReader->next(*(mapBatch.elements.get()), totalChildren, 0);
- }
- }
-
- class UnionColumnReader: public ColumnReader {
- private:
- std::unique_ptr<ByteRleDecoder> rle;
- std::vector<ColumnReader*> childrenReader;
- std::vector<int64_t> childrenCounts;
- uint64_t numChildren;
-
- public:
- UnionColumnReader(const Type& type, StripeStreams& stipe);
- ~UnionColumnReader();
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) override;
- };
-
- UnionColumnReader::UnionColumnReader(const Type& type,
- StripeStreams& stripe
- ): ColumnReader(type, stripe) {
- numChildren = type.getSubtypeCount();
- childrenReader.resize(numChildren);
- childrenCounts.resize(numChildren);
-
- rle = createByteRleDecoder(stripe.getStream(columnId,
- proto::Stream_Kind_DATA,
- true));
- // figure out which types are selected
- const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
- for(unsigned int i=0; i < numChildren; ++i) {
- const Type &child = type.getSubtype(i);
- if (selectedColumns[static_cast<size_t>(child.getColumnId())]) {
- childrenReader[i] = buildReader(child, stripe).release();
- }
- }
- }
-
- UnionColumnReader::~UnionColumnReader() {
- for(std::vector<ColumnReader*>::iterator itr = childrenReader.begin();
- itr != childrenReader.end(); ++itr) {
- delete *itr;
- }
- }
-
- uint64_t UnionColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
- const uint64_t BUFFER_SIZE = 1024;
- char buffer[BUFFER_SIZE];
- uint64_t lengthsRead = 0;
- int64_t *counts = childrenCounts.data();
- memset(counts, 0, sizeof(int64_t) * numChildren);
- while (lengthsRead < numValues) {
- uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
- rle->next(buffer, chunk, 0);
- for(size_t i=0; i < chunk; ++i) {
- counts[static_cast<size_t>(buffer[i])] += 1;
- }
- lengthsRead += chunk;
- }
- for(size_t i=0; i < numChildren; ++i) {
- if (counts[i] != 0 && childrenReader[i] != NULL) {
- childrenReader[i]->skip(static_cast<uint64_t>(counts[i]));
- }
- }
- return numValues;
- }
-
- void UnionColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- UnionVectorBatch &unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
- uint64_t* offsets = unionBatch.offsets.data();
- int64_t* counts = childrenCounts.data();
- memset(counts, 0, sizeof(int64_t) * numChildren);
- unsigned char* tags = unionBatch.tags.data();
- notNull = unionBatch.hasNulls ? unionBatch.notNull.data() : 0;
- rle->next(reinterpret_cast<char *>(tags), numValues, notNull);
- // set the offsets for each row
- if (notNull) {
- for(size_t i=0; i < numValues; ++i) {
- if (notNull[i]) {
- offsets[i] =
- static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
- }
- }
- } else {
- for(size_t i=0; i < numValues; ++i) {
- offsets[i] =
- static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
- }
- }
- // read the right number of each child column
- for(size_t i=0; i < numChildren; ++i) {
- if (childrenReader[i] != nullptr) {
- childrenReader[i]->next(*(unionBatch.children[i]),
- static_cast<uint64_t>(counts[i]), nullptr);
- }
- }
- }
-
- /**
- * Destructively convert the number from zigzag encoding to the
- * natural signed representation.
- */
- void unZigZagInt128(Int128& value) {
- bool needsNegate = value.getLowBits() & 1;
- value >>= 1;
- if (needsNegate) {
- value.negate();
- value -= 1;
- }
- }
-
- class Decimal64ColumnReader: public ColumnReader {
- public:
- static const uint32_t MAX_PRECISION_64 = 18;
- static const uint32_t MAX_PRECISION_128 = 38;
- static const int64_t POWERS_OF_TEN[MAX_PRECISION_64 + 1];
-
- protected:
- std::unique_ptr<SeekableInputStream> valueStream;
- int32_t precision;
- int32_t scale;
- const char* buffer;
- const char* bufferEnd;
-
- std::unique_ptr<RleDecoder> scaleDecoder;
-
- /**
- * Read the valueStream for more bytes.
- */
- void readBuffer() {
- while (buffer == bufferEnd) {
- int length;
- if (!valueStream->Next(reinterpret_cast<const void**>(&buffer),
- &length)) {
- throw ParseError("Read past end of stream in Decimal64ColumnReader "+
- valueStream->getName());
- }
- bufferEnd = buffer + length;
- }
- }
-
- void readInt64(int64_t& value, int32_t currentScale) {
- value = 0;
- size_t offset = 0;
- while (true) {
- readBuffer();
- unsigned char ch = static_cast<unsigned char>(*(buffer++));
- value |= static_cast<uint64_t>(ch & 0x7f) << offset;
- offset += 7;
- if (!(ch & 0x80)) {
- break;
- }
- }
- value = unZigZag(static_cast<uint64_t>(value));
- if (scale > currentScale) {
- value *= POWERS_OF_TEN[scale - currentScale];
- } else if (scale < currentScale) {
- value /= POWERS_OF_TEN[currentScale - scale];
- }
- }
-
- public:
- Decimal64ColumnReader(const Type& type, StripeStreams& stipe);
- ~Decimal64ColumnReader();
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) override;
- };
- const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
- const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
- const int64_t Decimal64ColumnReader::POWERS_OF_TEN[MAX_PRECISION_64 + 1]=
- {1,
- 10,
- 100,
- 1000,
- 10000,
- 100000,
- 1000000,
- 10000000,
- 100000000,
- 1000000000,
- 10000000000,
- 100000000000,
- 1000000000000,
- 10000000000000,
- 100000000000000,
- 1000000000000000,
- 10000000000000000,
- 100000000000000000,
- 1000000000000000000};
-
- Decimal64ColumnReader::Decimal64ColumnReader(const Type& type,
- StripeStreams& stripe
- ): ColumnReader(type, stripe) {
- scale = static_cast<int32_t>(type.getScale());
- precision = static_cast<int32_t>(type.getPrecision());
- valueStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
- buffer = nullptr;
- bufferEnd = nullptr;
- RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
- scaleDecoder = createRleDecoder(stripe.getStream
- (columnId,
- proto::Stream_Kind_SECONDARY,
- true),
- true, vers, memoryPool);
- }
-
- Decimal64ColumnReader::~Decimal64ColumnReader() {
- // PASS
- }
-
- uint64_t Decimal64ColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
- uint64_t skipped = 0;
- while (skipped < numValues) {
- readBuffer();
- if (!(0x80 & *(buffer++))) {
- skipped += 1;
- }
- }
- scaleDecoder->skip(numValues);
- return numValues;
- }
-
- void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : 0;
- Decimal64VectorBatch &batch =
- dynamic_cast<Decimal64VectorBatch&>(rowBatch);
- int64_t* values = batch.values.data();
- // read the next group of scales
- int64_t* scaleBuffer = batch.readScales.data();
- scaleDecoder->next(scaleBuffer, numValues, notNull);
- batch.precision = precision;
- batch.scale = scale;
- if (notNull) {
- for(size_t i=0; i < numValues; ++i) {
- if (notNull[i]) {
- readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
- }
- }
- } else {
- for(size_t i=0; i < numValues; ++i) {
- readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
- }
- }
- }
-
- void scaleInt128(Int128& value, uint32_t scale, uint32_t currentScale) {
- if (scale > currentScale) {
- while(scale > currentScale) {
- uint32_t scaleAdjust =
- std::min(Decimal64ColumnReader::MAX_PRECISION_64,
- scale - currentScale);
- value *= Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust];
- currentScale += scaleAdjust;
- }
- } else if (scale < currentScale) {
- Int128 remainder;
- while(currentScale > scale) {
- uint32_t scaleAdjust =
- std::min(Decimal64ColumnReader::MAX_PRECISION_64,
- currentScale - scale);
- value = value.divide(Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust],
- remainder);
- currentScale -= scaleAdjust;
- }
- }
- }
-
- class Decimal128ColumnReader: public Decimal64ColumnReader {
- public:
- Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
- ~Decimal128ColumnReader();
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) override;
-
- private:
- void readInt128(Int128& value, int32_t currentScale) {
- value = 0;
- Int128 work;
- uint32_t offset = 0;
- while (true) {
- readBuffer();
- unsigned char ch = static_cast<unsigned char>(*(buffer++));
- work = ch & 0x7f;
- work <<= offset;
- value |= work;
- offset += 7;
- if (!(ch & 0x80)) {
- break;
- }
- }
- unZigZagInt128(value);
- scaleInt128(value, static_cast<uint32_t>(scale),
- static_cast<uint32_t>(currentScale));
- }
- };
-
- Decimal128ColumnReader::Decimal128ColumnReader
- (const Type& type,
- StripeStreams& stripe
- ): Decimal64ColumnReader(type, stripe) {
- // PASS
- }
-
- Decimal128ColumnReader::~Decimal128ColumnReader() {
- // PASS
- }
-
- void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : 0;
- Decimal128VectorBatch &batch =
- dynamic_cast<Decimal128VectorBatch&>(rowBatch);
- Int128* values = batch.values.data();
- // read the next group of scales
- int64_t* scaleBuffer = batch.readScales.data();
- scaleDecoder->next(scaleBuffer, numValues, notNull);
- batch.precision = precision;
- batch.scale = scale;
- if (notNull) {
- for(size_t i=0; i < numValues; ++i) {
- if (notNull[i]) {
- readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
- }
- }
- } else {
- for(size_t i=0; i < numValues; ++i) {
- readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
- }
- }
- }
-
- class DecimalHive11ColumnReader: public Decimal64ColumnReader {
- private:
- bool throwOnOverflow;
- std::ostream* errorStream;
-
- /**
- * Read an Int128 from the stream and correct it to the desired scale.
- */
- bool readInt128(Int128& value, int32_t currentScale) {
- // -/+ 99999999999999999999999999999999999999
- static const Int128 MIN_VALUE(-0x4b3b4ca85a86c47b, 0xf675ddc000000001);
- static const Int128 MAX_VALUE( 0x4b3b4ca85a86c47a, 0x098a223fffffffff);
-
- value = 0;
- Int128 work;
- uint32_t offset = 0;
- bool result = true;
- while (true) {
- readBuffer();
- unsigned char ch = static_cast<unsigned char>(*(buffer++));
- work = ch & 0x7f;
- // If we have read more than 128 bits, we flag the error, but keep
- // reading bytes so the stream isn't thrown off.
- if (offset > 128 || (offset == 126 && work > 3)) {
- result = false;
- }
- work <<= offset;
- value |= work;
- offset += 7;
- if (!(ch & 0x80)) {
- break;
- }
- }
-
- if (!result) {
- return result;
- }
- unZigZagInt128(value);
- scaleInt128(value, static_cast<uint32_t>(scale),
- static_cast<uint32_t>(currentScale));
- return value >= MIN_VALUE && value <= MAX_VALUE;
- }
-
- public:
- DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe);
- ~DecimalHive11ColumnReader();
-
- void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) override;
- };
-
- DecimalHive11ColumnReader::DecimalHive11ColumnReader
- (const Type& type,
- StripeStreams& stripe
- ): Decimal64ColumnReader(type, stripe) {
- const ReaderOptions options = stripe.getReaderOptions();
- scale = options.getForcedScaleOnHive11Decimal();
- throwOnOverflow = options.getThrowOnHive11DecimalOverflow();
- errorStream = options.getErrorStream();
- }
-
- DecimalHive11ColumnReader::~DecimalHive11ColumnReader() {
- // PASS
- }
-
- void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : 0;
- Decimal128VectorBatch &batch =
- dynamic_cast<Decimal128VectorBatch&>(rowBatch);
- Int128* values = batch.values.data();
- // read the next group of scales
- int64_t* scaleBuffer = batch.readScales.data();
-
- scaleDecoder->next(scaleBuffer, numValues, notNull);
-
- batch.precision = precision;
- batch.scale = scale;
- if (notNull) {
- for(size_t i=0; i < numValues; ++i) {
- if (notNull[i]) {
- if (!readInt128(values[i],
- static_cast<int32_t>(scaleBuffer[i]))) {
- if (throwOnOverflow) {
- throw ParseError("Hive 0.11 decimal was more than 38 digits.");
- } else {
- *errorStream << "Warning: "
- << "Hive 0.11 decimal with more than 38 digits "
- << "replaced by NULL.\n";
- notNull[i] = false;
- }
- }
- }
- }
- } else {
- for(size_t i=0; i < numValues; ++i) {
- if (!readInt128(values[i],
- static_cast<int32_t>(scaleBuffer[i]))) {
- if (throwOnOverflow) {
- throw ParseError("Hive 0.11 decimal was more than 38 digits.");
- } else {
- *errorStream << "Warning: "
- << "Hive 0.11 decimal with more than 38 digits "
- << "replaced by NULL.\n";
- batch.hasNulls = true;
- batch.notNull[i] = false;
- }
- }
- }
- }
- }
-
- /**
- * Create a reader for the given stripe.
- */
- std::unique_ptr<ColumnReader> buildReader(const Type& type,
- StripeStreams& stripe) {
- switch (static_cast<int64_t>(type.getKind())) {
- case DATE:
- case INT:
- case LONG:
- case SHORT:
- return std::unique_ptr<ColumnReader>(
- new IntegerColumnReader(type, stripe));
- case BINARY:
- case CHAR:
- case STRING:
- case VARCHAR:
- switch (static_cast<int64_t>(stripe.getEncoding(type.getColumnId()).kind())){
- case proto::ColumnEncoding_Kind_DICTIONARY:
- case proto::ColumnEncoding_Kind_DICTIONARY_V2:
- return std::unique_ptr<ColumnReader>(
- new StringDictionaryColumnReader(type, stripe));
- case proto::ColumnEncoding_Kind_DIRECT:
- case proto::ColumnEncoding_Kind_DIRECT_V2:
- return std::unique_ptr<ColumnReader>(
- new StringDirectColumnReader(type, stripe));
- default:
- throw NotImplementedYet("buildReader unhandled string encoding");
- }
-
- case BOOLEAN:
- return std::unique_ptr<ColumnReader>(
- new BooleanColumnReader(type, stripe));
-
- case BYTE:
- return std::unique_ptr<ColumnReader>(
- new ByteColumnReader(type, stripe));
-
- case LIST:
- return std::unique_ptr<ColumnReader>(
- new ListColumnReader(type, stripe));
-
- case MAP:
- return std::unique_ptr<ColumnReader>(
- new MapColumnReader(type, stripe));
-
- case UNION:
- return std::unique_ptr<ColumnReader>(
- new UnionColumnReader(type, stripe));
-
- case STRUCT:
- return std::unique_ptr<ColumnReader>(
- new StructColumnReader(type, stripe));
-
- case FLOAT:
- case DOUBLE:
- return std::unique_ptr<ColumnReader>(
- new DoubleColumnReader(type, stripe));
-
- case TIMESTAMP:
- return std::unique_ptr<ColumnReader>
- (new TimestampColumnReader(type, stripe));
-
- case DECIMAL:
- // is this a Hive 0.11 or 0.12 file?
- if (type.getPrecision() == 0) {
- return std::unique_ptr<ColumnReader>
- (new DecimalHive11ColumnReader(type, stripe));
-
- // can we represent the values using int64_t?
- } else if (type.getPrecision() <=
- Decimal64ColumnReader::MAX_PRECISION_64) {
- return std::unique_ptr<ColumnReader>
- (new Decimal64ColumnReader(type, stripe));
-
- // otherwise we use the Int128 implementation
- } else {
- return std::unique_ptr<ColumnReader>
- (new Decimal128ColumnReader(type, stripe));
- }
-
- default:
- throw NotImplementedYet("buildReader unhandled type");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/7f55b453/c++/src/orc/ColumnReader.hh
----------------------------------------------------------------------
diff --git a/c++/src/orc/ColumnReader.hh b/c++/src/orc/ColumnReader.hh
deleted file mode 100644
index b90c942..0000000
--- a/c++/src/orc/ColumnReader.hh
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef ORC_COLUMN_READER_HH
-#define ORC_COLUMN_READER_HH
-
-#include "orc/Vector.hh"
-#include "ByteRLE.hh"
-#include "Compression.hh"
-#include "wrap/orc-proto-wrapper.hh"
-
-namespace orc {
-
- class StripeStreams {
- public:
- virtual ~StripeStreams();
-
- /**
- * Get the reader options.
- */
- virtual const ReaderOptions& getReaderOptions() const = 0;
-
- /**
- * Get the array of booleans for which columns are selected.
- * @return the address of an array which contains true at the index of
- * each columnId is selected.
- */
- virtual const std::vector<bool> getSelectedColumns() const = 0;
-
- /**
- * Get the encoding for the given column for this stripe.
- */
- virtual proto::ColumnEncoding getEncoding(int64_t columnId) const = 0;
-
- /**
- * Get the stream for the given column/kind in this stripe.
- * @param columnId the id of the column
- * @param kind the kind of the stream
- * @param shouldStream should the reading page the stream in
- * @return the new stream
- */
- virtual std::unique_ptr<SeekableInputStream>
- getStream(int64_t columnId,
- proto::Stream_Kind kind,
- bool shouldStream) const = 0;
-
- /**
- * Get the memory pool for this reader.
- */
- virtual MemoryPool& getMemoryPool() const = 0;
- };
-
- /**
- * The interface for reading ORC data types.
- */
- class ColumnReader {
- protected:
- std::unique_ptr<ByteRleDecoder> notNullDecoder;
- int64_t columnId;
- MemoryPool& memoryPool;
-
- public:
- ColumnReader(const Type& type, StripeStreams& stipe);
-
- virtual ~ColumnReader();
-
- /**
- * Skip number of specified rows.
- * @param numValues the number of values to skip
- * @return the number of non-null values skipped
- */
- virtual uint64_t skip(uint64_t numValues);
-
- /**
- * Read the next group of values into this rowBatch.
- * @param rowBatch the memory to read into.
- * @param numValues the number of values to read
- * @param notNull if null, all values are not null. Otherwise, it is
- * a mask (with at least numValues bytes) for which values to
- * set.
- */
- virtual void next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char* notNull);
- };
-
- /**
- * Create a reader for the given stripe.
- */
- std::unique_ptr<ColumnReader> buildReader(const Type& type,
- StripeStreams& stripe);
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/orc/blob/7f55b453/c++/src/orc/Compression.cc
----------------------------------------------------------------------
diff --git a/c++/src/orc/Compression.cc b/c++/src/orc/Compression.cc
deleted file mode 100644
index 06e10e0..0000000
--- a/c++/src/orc/Compression.cc
+++ /dev/null
@@ -1,751 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "orc/Adaptor.hh"
-#include "Compression.hh"
-#include "Exceptions.hh"
-
-#include <algorithm>
-#include <iomanip>
-#include <iostream>
-#include <sstream>
-
-#include "zlib.h"
-
-#include "wrap/snappy-wrapper.h"
-
-namespace orc {
-
- void printBuffer(std::ostream& out,
- const char *buffer,
- uint64_t length) {
- const uint64_t width = 24;
- out << std::hex;
- for(uint64_t line = 0; line < (length + width - 1) / width; ++line) {
- out << std::setfill('0') << std::setw(7) << (line * width);
- for(uint64_t byte = 0;
- byte < width && line * width + byte < length; ++byte) {
- out << " " << std::setfill('0') << std::setw(2)
- << static_cast<uint64_t>(0xff & buffer[line * width +
- byte]);
- }
- out << "\n";
- }
- out << std::dec;
- }
-
- PositionProvider::PositionProvider(const std::list<uint64_t>& posns) {
- position = posns.begin();
- }
-
- uint64_t PositionProvider::next() {
- uint64_t result = *position;
- ++position;
- return result;
- }
-
- SeekableInputStream::~SeekableInputStream() {
- // PASS
- }
-
- SeekableArrayInputStream::~SeekableArrayInputStream() {
- // PASS
- }
-
- SeekableArrayInputStream::SeekableArrayInputStream
- (const unsigned char* values,
- uint64_t size,
- int64_t blkSize
- ): data(reinterpret_cast<const char*>(values)) {
- length = size;
- position = 0;
- blockSize = blkSize == -1 ? length : static_cast<uint64_t>(blkSize);
- }
-
- SeekableArrayInputStream::SeekableArrayInputStream(const char* values,
- uint64_t size,
- int64_t blkSize
- ): data(values) {
- length = size;
- position = 0;
- blockSize = blkSize == -1 ? length : static_cast<uint64_t>(blkSize);
- }
-
- bool SeekableArrayInputStream::Next(const void** buffer, int*size) {
- uint64_t currentSize = std::min(length - position, blockSize);
- if (currentSize > 0) {
- *buffer = data + position;
- *size = static_cast<int>(currentSize);
- position += currentSize;
- return true;
- }
- *size = 0;
- return false;
- }
-
- void SeekableArrayInputStream::BackUp(int count) {
- if (count >= 0) {
- uint64_t unsignedCount = static_cast<uint64_t>(count);
- if (unsignedCount <= blockSize && unsignedCount <= position) {
- position -= unsignedCount;
- } else {
- throw std::logic_error("Can't backup that much!");
- }
- }
- }
-
- bool SeekableArrayInputStream::Skip(int count) {
- if (count >= 0) {
- uint64_t unsignedCount = static_cast<uint64_t>(count);
- if (unsignedCount + position <= length) {
- position += unsignedCount;
- return true;
- } else {
- position = length;
- }
- }
- return false;
- }
-
- google::protobuf::int64 SeekableArrayInputStream::ByteCount() const {
- return static_cast<google::protobuf::int64>(position);
- }
-
- void SeekableArrayInputStream::seek(PositionProvider& seekPosition) {
- position = seekPosition.next();
- }
-
- std::string SeekableArrayInputStream::getName() const {
- std::ostringstream result;
- result << "SeekableArrayInputStream " << position << " of " << length;
- return result.str();
- }
-
- static uint64_t computeBlock(int64_t request, uint64_t length) {
- return std::min(length,
- static_cast<uint64_t>(request < 0 ?
- 256 * 1024 : request));
- }
-
- SeekableFileInputStream::SeekableFileInputStream(InputStream* stream,
- uint64_t offset,
- uint64_t byteCount,
- MemoryPool& _pool,
- int64_t _blockSize
- ):pool(_pool),
- input(stream),
- start(offset),
- length(byteCount),
- blockSize(computeBlock
- (_blockSize,
- length)) {
-
- position = 0;
- buffer.reset(new DataBuffer<char>(pool));
- pushBack = 0;
- }
-
- SeekableFileInputStream::~SeekableFileInputStream() {
- // PASS
- }
-
- bool SeekableFileInputStream::Next(const void** data, int*size) {
- uint64_t bytesRead;
- if (pushBack != 0) {
- *data = buffer->data() + (buffer->size() - pushBack);
- bytesRead = pushBack;
- } else {
- bytesRead = std::min(length - position, blockSize);
- buffer->resize(bytesRead);
- if (bytesRead > 0) {
- input->read(buffer->data(), bytesRead, start+position);
- *data = static_cast<void*>(buffer->data());
- }
- }
- position += bytesRead;
- pushBack = 0;
- *size = static_cast<int>(bytesRead);
- return bytesRead != 0;
- }
-
- void SeekableFileInputStream::BackUp(int signedCount) {
- if (signedCount < 0) {
- throw std::logic_error("can't backup negative distances");
- }
- uint64_t count = static_cast<uint64_t>(signedCount);
- if (pushBack > 0) {
- throw std::logic_error("can't backup unless we just called Next");
- }
- if (count > blockSize || count > position) {
- throw std::logic_error("can't backup that far");
- }
- pushBack = static_cast<uint64_t>(count);
- position -= pushBack;
- }
-
- bool SeekableFileInputStream::Skip(int signedCount) {
- if (signedCount < 0) {
- return false;
- }
- uint64_t count = static_cast<uint64_t>(signedCount);
- position = std::min(position + count, length);
- pushBack = 0;
- return position < length;
- }
-
- int64_t SeekableFileInputStream::ByteCount() const {
- return static_cast<int64_t>(position);
- }
-
- void SeekableFileInputStream::seek(PositionProvider& location) {
- position = location.next();
- if (position > length) {
- position = length;
- throw std::logic_error("seek too far");
- }
- pushBack = 0;
- }
-
- std::string SeekableFileInputStream::getName() const {
- std::ostringstream result;
- result << input->getName() << " from " << start << " for "
- << length;
- return result.str();
- }
-
- enum DecompressState { DECOMPRESS_HEADER,
- DECOMPRESS_START,
- DECOMPRESS_CONTINUE,
- DECOMPRESS_ORIGINAL,
- DECOMPRESS_EOF};
-
- class ZlibDecompressionStream: public SeekableInputStream {
- public:
- ZlibDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
- size_t blockSize,
- MemoryPool& pool);
- virtual ~ZlibDecompressionStream();
- virtual bool Next(const void** data, int*size) override;
- virtual void BackUp(int count) override;
- virtual bool Skip(int count) override;
- virtual int64_t ByteCount() const override;
- virtual void seek(PositionProvider& position) override;
- virtual std::string getName() const override;
-
- private:
- void readBuffer(bool failOnEof) {
- int length;
- if (!input->Next(reinterpret_cast<const void**>(&inputBuffer),
- &length)) {
- if (failOnEof) {
- throw ParseError("Read past EOF in "
- "ZlibDecompressionStream::readBuffer");
- }
- state = DECOMPRESS_EOF;
- inputBuffer = nullptr;
- inputBufferEnd = nullptr;
- } else {
- inputBufferEnd = inputBuffer + length;
- }
- }
-
- uint32_t readByte(bool failOnEof) {
- if (inputBuffer == inputBufferEnd) {
- readBuffer(failOnEof);
- if (state == DECOMPRESS_EOF) {
- return 0;
- }
- }
- return static_cast<unsigned char>(*(inputBuffer++));
- }
-
- void readHeader() {
- uint32_t header = readByte(false);
- if (state != DECOMPRESS_EOF) {
- header |= readByte(true) << 8;
- header |= readByte(true) << 16;
- if (header & 1) {
- state = DECOMPRESS_ORIGINAL;
- } else {
- state = DECOMPRESS_START;
- }
- remainingLength = header >> 1;
- } else {
- remainingLength = 0;
- }
- }
-
- MemoryPool& pool;
- const size_t blockSize;
- std::unique_ptr<SeekableInputStream> input;
- z_stream zstream;
- DataBuffer<char> buffer;
-
- // the current state
- DecompressState state;
-
- // the start of the current buffer
- // This pointer is not owned by us. It is either owned by zstream or
- // the underlying stream.
- const char* outputBuffer;
- // the size of the current buffer
- size_t outputBufferLength;
- // the size of the current chunk
- size_t remainingLength;
-
- // the last buffer returned from the input
- const char *inputBuffer;
- const char *inputBufferEnd;
-
- // roughly the number of bytes returned
- off_t bytesReturned;
- };
-
-DIAGNOSTIC_PUSH
-DIAGNOSTIC_IGNORE("-Wold-style-cast")
-
- ZlibDecompressionStream::ZlibDecompressionStream
- (std::unique_ptr<SeekableInputStream> inStream,
- size_t _blockSize,
- MemoryPool& _pool
- ): pool(_pool),
- blockSize(_blockSize),
- buffer(pool, _blockSize) {
- input.reset(inStream.release());
- zstream.next_in = Z_NULL;
- zstream.avail_in = 0;
- zstream.zalloc = Z_NULL;
- zstream.zfree = Z_NULL;
- zstream.opaque = Z_NULL;
- zstream.next_out = reinterpret_cast<Bytef*>(buffer.data());
- zstream.avail_out = static_cast<uInt>(blockSize);
- int64_t result = inflateInit2(&zstream, -15);
- switch (result) {
- case Z_OK:
- break;
- case Z_MEM_ERROR:
- throw std::logic_error("Memory error from inflateInit2");
- case Z_VERSION_ERROR:
- throw std::logic_error("Version error from inflateInit2");
- case Z_STREAM_ERROR:
- throw std::logic_error("Stream error from inflateInit2");
- default:
- throw std::logic_error("Unknown error from inflateInit2");
- }
- outputBuffer = nullptr;
- outputBufferLength = 0;
- remainingLength = 0;
- state = DECOMPRESS_HEADER;
- inputBuffer = nullptr;
- inputBufferEnd = nullptr;
- bytesReturned = 0;
- }
-
-DIAGNOSTIC_POP
-
- ZlibDecompressionStream::~ZlibDecompressionStream() {
- int64_t result = inflateEnd(&zstream);
- if (result != Z_OK) {
- // really can't throw in destructors
- std::cout << "Error in ~ZlibDecompressionStream() " << result << "\n";
- }
- }
-
- bool ZlibDecompressionStream::Next(const void** data, int*size) {
- // if the user pushed back, return them the partial buffer
- if (outputBufferLength) {
- *data = outputBuffer;
- *size = static_cast<int>(outputBufferLength);
- outputBuffer += outputBufferLength;
- outputBufferLength = 0;
- return true;
- }
- if (state == DECOMPRESS_HEADER || remainingLength == 0) {
- readHeader();
- }
- if (state == DECOMPRESS_EOF) {
- return false;
- }
- if (inputBuffer == inputBufferEnd) {
- readBuffer(true);
- }
- size_t availSize =
- std::min(static_cast<size_t>(inputBufferEnd - inputBuffer),
- remainingLength);
- if (state == DECOMPRESS_ORIGINAL) {
- *data = inputBuffer;
- *size = static_cast<int>(availSize);
- outputBuffer = inputBuffer + availSize;
- outputBufferLength = 0;
- } else if (state == DECOMPRESS_START) {
- zstream.next_in =
- reinterpret_cast<Bytef*>(const_cast<char*>(inputBuffer));
- zstream.avail_in = static_cast<uInt>(availSize);
- outputBuffer = buffer.data();
- zstream.next_out =
- reinterpret_cast<Bytef*>(const_cast<char*>(outputBuffer));
- zstream.avail_out = static_cast<uInt>(blockSize);
- if (inflateReset(&zstream) != Z_OK) {
- throw std::logic_error("Bad inflateReset in "
- "ZlibDecompressionStream::Next");
- }
- int64_t result;
- do {
- result = inflate(&zstream, availSize == remainingLength ? Z_FINISH :
- Z_SYNC_FLUSH);
- switch (result) {
- case Z_OK:
- remainingLength -= availSize;
- inputBuffer += availSize;
- readBuffer(true);
- availSize =
- std::min(static_cast<size_t>(inputBufferEnd - inputBuffer),
- remainingLength);
- zstream.next_in =
- reinterpret_cast<Bytef*>(const_cast<char*>(inputBuffer));
- zstream.avail_in = static_cast<uInt>(availSize);
- break;
- case Z_STREAM_END:
- break;
- case Z_BUF_ERROR:
- throw std::logic_error("Buffer error in "
- "ZlibDecompressionStream::Next");
- case Z_DATA_ERROR:
- throw std::logic_error("Data error in "
- "ZlibDecompressionStream::Next");
- case Z_STREAM_ERROR:
- throw std::logic_error("Stream error in "
- "ZlibDecompressionStream::Next");
- default:
- throw std::logic_error("Unknown error in "
- "ZlibDecompressionStream::Next");
- }
- } while (result != Z_STREAM_END);
- *size = static_cast<int>(blockSize - zstream.avail_out);
- *data = outputBuffer;
- outputBufferLength = 0;
- outputBuffer += *size;
- } else {
- throw std::logic_error("Unknown compression state in "
- "ZlibDecompressionStream::Next");
- }
- inputBuffer += availSize;
- remainingLength -= availSize;
- bytesReturned += *size;
- return true;
- }
-
- void ZlibDecompressionStream::BackUp(int count) {
- if (outputBuffer == nullptr || outputBufferLength != 0) {
- throw std::logic_error("Backup without previous Next in "
- "ZlibDecompressionStream");
- }
- outputBuffer -= static_cast<size_t>(count);
- outputBufferLength = static_cast<size_t>(count);
- bytesReturned -= count;
- }
-
- bool ZlibDecompressionStream::Skip(int count) {
- bytesReturned += count;
- // this is a stupid implementation for now.
- // should skip entire blocks without decompressing
- while (count > 0) {
- const void *ptr;
- int len;
- if (!Next(&ptr, &len)) {
- return false;
- }
- if (len > count) {
- BackUp(len - count);
- count = 0;
- } else {
- count -= len;
- }
- }
- return true;
- }
-
- int64_t ZlibDecompressionStream::ByteCount() const {
- return bytesReturned;
- }
-
- void ZlibDecompressionStream::seek(PositionProvider& position) {
- input->seek(position);
- bytesReturned = input->ByteCount();
- if (!Skip(static_cast<int>(position.next()))) {
- throw ParseError("Bad skip in ZlibDecompressionStream::seek");
- }
- }
-
- std::string ZlibDecompressionStream::getName() const {
- std::ostringstream result;
- result << "zlib(" << input->getName() << ")";
- return result.str();
- }
-
- class SnappyDecompressionStream: public SeekableInputStream {
- public:
- SnappyDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
- size_t blockSize,
- MemoryPool& pool);
-
- virtual ~SnappyDecompressionStream() {}
- virtual bool Next(const void** data, int*size) override;
- virtual void BackUp(int count) override;
- virtual bool Skip(int count) override;
- virtual int64_t ByteCount() const override;
- virtual void seek(PositionProvider& position) override;
- virtual std::string getName() const override;
-
- private:
- void readBuffer(bool failOnEof) {
- int length;
- if (!input->Next(reinterpret_cast<const void**>(&inputBufferPtr),
- &length)) {
- if (failOnEof) {
- throw ParseError("SnappyDecompressionStream read past EOF");
- }
- state = DECOMPRESS_EOF;
- inputBufferPtr = nullptr;
- inputBufferPtrEnd = nullptr;
- } else {
- inputBufferPtrEnd = inputBufferPtr + length;
- }
- }
-
- uint32_t readByte(bool failOnEof) {
- if (inputBufferPtr == inputBufferPtrEnd) {
- readBuffer(failOnEof);
- if (state == DECOMPRESS_EOF) {
- return 0;
- }
- }
- return static_cast<unsigned char>(*(inputBufferPtr++));
- }
-
- void readHeader() {
- uint32_t header = readByte(false);
- if (state != DECOMPRESS_EOF) {
- header |= readByte(true) << 8;
- header |= readByte(true) << 16;
- if (header & 1) {
- state = DECOMPRESS_ORIGINAL;
- } else {
- state = DECOMPRESS_START;
- }
- remainingLength = header >> 1;
- } else {
- remainingLength = 0;
- }
- }
-
- std::unique_ptr<SeekableInputStream> input;
- MemoryPool& pool;
-
- // may need to stitch together multiple input buffers;
- // to give snappy a contiguous block
- DataBuffer<char> inputBuffer;
-
- // uncompressed output
- DataBuffer<char> outputBuffer;
-
- // the current state
- DecompressState state;
-
- // the start of the current output buffer
- const char* outputBufferPtr;
- // the size of the current output buffer
- size_t outputBufferLength;
-
- // the size of the current chunk
- size_t remainingLength;
-
- // the last buffer returned from the input
- const char *inputBufferPtr;
- const char *inputBufferPtrEnd;
-
- // bytes returned by this stream
- off_t bytesReturned;
- };
-
- SnappyDecompressionStream::SnappyDecompressionStream
- (std::unique_ptr<SeekableInputStream> inStream,
- size_t bufferSize,
- MemoryPool& _pool
- ) : pool(_pool),
- inputBuffer(pool, bufferSize),
- outputBuffer(pool, bufferSize),
- state(DECOMPRESS_HEADER),
- outputBufferPtr(0),
- outputBufferLength(0),
- remainingLength(0),
- inputBufferPtr(0),
- inputBufferPtrEnd(0),
- bytesReturned(0) {
- input.reset(inStream.release());
- }
-
- bool SnappyDecompressionStream::Next(const void** data, int*size) {
- // if the user pushed back, return them the partial buffer
- if (outputBufferLength) {
- *data = outputBufferPtr;
- *size = static_cast<int>(outputBufferLength);
- outputBufferPtr += outputBufferLength;
- bytesReturned += outputBufferLength;
- outputBufferLength = 0;
- return true;
- }
- if (state == DECOMPRESS_HEADER || remainingLength == 0) {
- readHeader();
- }
- if (state == DECOMPRESS_EOF) {
- return false;
- }
- if (inputBufferPtr == inputBufferPtrEnd) {
- readBuffer(true);
- }
-
- size_t availSize =
- std::min(static_cast<size_t>(inputBufferPtrEnd - inputBufferPtr),
- remainingLength);
- if (state == DECOMPRESS_ORIGINAL) {
- *data = inputBufferPtr;
- *size = static_cast<int>(availSize);
- outputBufferPtr = inputBufferPtr + availSize;
- outputBufferLength = 0;
- inputBufferPtr += availSize;
- remainingLength -= availSize;
- } else if (state == DECOMPRESS_START) {
- // Get contiguous bytes of compressed block.
- const char *compressed = inputBufferPtr;
- if (remainingLength == availSize) {
- inputBufferPtr += availSize;
- } else {
- // Did not read enough from input.
- if (inputBuffer.capacity() < remainingLength) {
- inputBuffer.resize(remainingLength);
- }
- ::memcpy(inputBuffer.data(), inputBufferPtr, availSize);
- inputBufferPtr += availSize;
- compressed = inputBuffer.data();
-
- for (size_t pos = availSize; pos < remainingLength; ) {
- readBuffer(true);
- size_t avail =
- std::min(static_cast<size_t>(inputBufferPtrEnd - inputBufferPtr),
- remainingLength - pos);
- ::memcpy(inputBuffer.data() + pos, inputBufferPtr, avail);
- pos += avail;
- inputBufferPtr += avail;
- }
- }
-
- if (!snappy::GetUncompressedLength(compressed, remainingLength,
- &outputBufferLength)) {
- throw ParseError("SnappyDecompressionStream choked on corrupt input");
- }
-
- if (outputBufferLength > outputBuffer.capacity()) {
- throw std::logic_error("uncompressed length exceeds block size");
- }
-
- if (!snappy::RawUncompress(compressed, remainingLength,
- outputBuffer.data())) {
- throw ParseError("SnappyDecompressionStream choked on corrupt input");
- }
-
- remainingLength = 0;
- state = DECOMPRESS_HEADER;
- *data = outputBuffer.data();
- *size = static_cast<int>(outputBufferLength);
- outputBufferPtr = outputBuffer.data() + outputBufferLength;
- outputBufferLength = 0;
- }
-
- bytesReturned += *size;
- return true;
- }
-
- void SnappyDecompressionStream::BackUp(int count) {
- if (outputBufferPtr == nullptr || outputBufferLength != 0) {
- throw std::logic_error("Backup without previous Next in "
- "SnappyDecompressionStream");
- }
- outputBufferPtr -= static_cast<size_t>(count);
- outputBufferLength = static_cast<size_t>(count);
- bytesReturned -= count;
- }
-
- bool SnappyDecompressionStream::Skip(int count) {
- bytesReturned += count;
- // this is a stupid implementation for now.
- // should skip entire blocks without decompressing
- while (count > 0) {
- const void *ptr;
- int len;
- if (!Next(&ptr, &len)) {
- return false;
- }
- if (len > count) {
- BackUp(len - count);
- count = 0;
- } else {
- count -= len;
- }
- }
- return true;
- }
-
- int64_t SnappyDecompressionStream::ByteCount() const {
- return bytesReturned;
- }
-
- void SnappyDecompressionStream::seek(PositionProvider& position) {
- input->seek(position);
- if (!Skip(static_cast<int>(position.next()))) {
- throw ParseError("Bad skip in SnappyDecompressionStream::seek");
- }
- }
-
- std::string SnappyDecompressionStream::getName() const {
- std::ostringstream result;
- result << "snappy(" << input->getName() << ")";
- return result.str();
- }
-
- std::unique_ptr<SeekableInputStream>
- createDecompressor(CompressionKind kind,
- std::unique_ptr<SeekableInputStream> input,
- uint64_t blockSize,
- MemoryPool& pool) {
- switch (static_cast<int64_t>(kind)) {
- case CompressionKind_NONE:
- return std::move(input);
- case CompressionKind_ZLIB:
- return std::unique_ptr<SeekableInputStream>
- (new ZlibDecompressionStream(std::move(input), blockSize, pool));
- case CompressionKind_SNAPPY:
- return std::unique_ptr<SeekableInputStream>
- (new SnappyDecompressionStream(std::move(input), blockSize, pool));
- case CompressionKind_LZO:
- default:
- throw NotImplementedYet("compression codec");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/7f55b453/c++/src/orc/Compression.hh
----------------------------------------------------------------------
diff --git a/c++/src/orc/Compression.hh b/c++/src/orc/Compression.hh
deleted file mode 100644
index 222dc54..0000000
--- a/c++/src/orc/Compression.hh
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef ORC_COMPRESSION_HH
-#define ORC_COMPRESSION_HH
-
-#include "orc/Adaptor.hh"
-#include "orc/OrcFile.hh"
-#include "wrap/zero-copy-stream-wrapper.h"
-
-#include <list>
-#include <vector>
-#include <fstream>
-#include <iostream>
-#include <sstream>
-#include <memory>
-
-namespace orc {
-
- void printBuffer(std::ostream& out,
- const char *buffer,
- uint64_t length);
-
- class PositionProvider {
- private:
- std::list<uint64_t>::const_iterator position;
- public:
- PositionProvider(const std::list<uint64_t>& positions);
- uint64_t next();
- };
-
- /**
- * A subclass of Google's ZeroCopyInputStream that supports seek.
- * By extending Google's class, we get the ability to pass it directly
- * to the protobuf readers.
- */
- class SeekableInputStream: public google::protobuf::io::ZeroCopyInputStream {
- public:
- virtual ~SeekableInputStream();
- virtual void seek(PositionProvider& position) = 0;
- virtual std::string getName() const = 0;
- };
-
- /**
- * Create a seekable input stream based on a memory range.
- */
- class SeekableArrayInputStream: public SeekableInputStream {
- private:
- const char* data;
- uint64_t length;
- uint64_t position;
- uint64_t blockSize;
-
- public:
- SeekableArrayInputStream(const unsigned char* list,
- uint64_t length,
- int64_t block_size = -1);
- SeekableArrayInputStream(const char* list,
- uint64_t length,
- int64_t block_size = -1);
- virtual ~SeekableArrayInputStream();
- virtual bool Next(const void** data, int*size) override;
- virtual void BackUp(int count) override;
- virtual bool Skip(int count) override;
- virtual google::protobuf::int64 ByteCount() const override;
- virtual void seek(PositionProvider& position) override;
- virtual std::string getName() const override;
- };
-
- /**
- * Create a seekable input stream based on an input stream.
- */
- class SeekableFileInputStream: public SeekableInputStream {
- private:
- MemoryPool& pool;
- InputStream* const input;
- const uint64_t start;
- const uint64_t length;
- const uint64_t blockSize;
- std::unique_ptr<DataBuffer<char> > buffer;
- uint64_t position;
- uint64_t pushBack;
-
- public:
- SeekableFileInputStream(InputStream* input,
- uint64_t offset,
- uint64_t byteCount,
- MemoryPool& pool,
- int64_t blockSize = -1);
- virtual ~SeekableFileInputStream();
-
- virtual bool Next(const void** data, int*size) override;
- virtual void BackUp(int count) override;
- virtual bool Skip(int count) override;
- virtual int64_t ByteCount() const override;
- virtual void seek(PositionProvider& position) override;
- virtual std::string getName() const override;
- };
-
- /**
- * Create a decompressor for the given compression kind.
- * @param kind the compression type to implement
- * @param input the input stream that is the underlying source
- * @param bufferSize the maximum size of the buffer
- * @param pool the memory pool
- */
- std::unique_ptr<SeekableInputStream>
- createDecompressor(CompressionKind kind,
- std::unique_ptr<SeekableInputStream> input,
- uint64_t bufferSize,
- MemoryPool& pool);
-}
-
-#endif