You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2015/07/27 18:40:37 UTC
orc git commit: ORC-9. Create a vector type for timestamp columns
that separates out the seconds from the nanoseconds.
Repository: orc
Updated Branches:
refs/heads/master 8971cca55 -> 8b7189bd2
ORC-9. Create a vector type for timestamp columns that separates out the
seconds from the nanoseconds.
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/8b7189bd
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/8b7189bd
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/8b7189bd
Branch: refs/heads/master
Commit: 8b7189bd2192335e255d89d516a74a944ed8f9b6
Parents: 8971cca
Author: Owen O'Malley <om...@apache.org>
Authored: Sun Jul 26 11:37:59 2015 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Jul 27 09:39:55 2015 -0700
----------------------------------------------------------------------
c++/include/orc/Vector.hh | 18 ++++++++++
c++/src/ColumnPrinter.cc | 32 ++++++-----------
c++/src/ColumnReader.cc | 48 +++++++++++++-------------
c++/src/ColumnReader.hh | 7 ++++
c++/src/Reader.cc | 45 ++++++++++++++++++++----
c++/src/Vector.cc | 28 +++++++++++++++
c++/test/TestColumnPrinter.cc | 38 ++++++++++++++-------
c++/test/TestColumnReader.cc | 70 +++++++++++++++++++++++++-------------
8 files changed, 196 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/8b7189bd/c++/include/orc/Vector.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Vector.hh b/c++/include/orc/Vector.hh
index e97c51e..ff732d0 100644
--- a/c++/include/orc/Vector.hh
+++ b/c++/include/orc/Vector.hh
@@ -294,6 +294,24 @@ namespace orc {
friend class DecimalHive11ColumnReader;
};
+ /**
+ * A column vector batch for storing timestamp values.
+ * The timestamps are stored split into the time_t value (seconds since
+ * 1 Jan 1970 00:00:00) and the nanoseconds within the time_t value.
+ */
+ struct TimestampVectorBatch: public ColumnVectorBatch {
+ TimestampVectorBatch(uint64_t capacity, MemoryPool& pool);
+ virtual ~TimestampVectorBatch();
+ std::string toString() const;
+ void resize(uint64_t capacity);
+
+ // the number of seconds past 1 Jan 1970 00:00 UTC (aka time_t)
+ DataBuffer<int64_t> data;
+
+ // the nanoseconds of each value
+ DataBuffer<int64_t> nanoseconds;
+ };
+
}
#endif
http://git-wip-us.apache.org/repos/asf/orc/blob/8b7189bd/c++/src/ColumnPrinter.cc
----------------------------------------------------------------------
diff --git a/c++/src/ColumnPrinter.cc b/c++/src/ColumnPrinter.cc
index 190441c..1367b02 100644
--- a/c++/src/ColumnPrinter.cc
+++ b/c++/src/ColumnPrinter.cc
@@ -67,8 +67,8 @@ namespace orc {
class TimestampColumnPrinter: public ColumnPrinter {
private:
- const int64_t* data;
- time_t epoch;
+ const int64_t* seconds;
+ const int64_t* nanoseconds;
public:
TimestampColumnPrinter(std::string&, const Type&);
@@ -667,33 +667,18 @@ namespace orc {
const Type& type
): ColumnPrinter(buffer,
type) {
- struct tm epochTm;
- epochTm.tm_sec = 0;
- epochTm.tm_min = 0;
- epochTm.tm_hour = 0;
- epochTm.tm_mday = 1;
- epochTm.tm_mon = 0;
- epochTm.tm_year = 70;
- epochTm.tm_isdst = 0;
- epoch = mktime(&epochTm);
+ // PASS
}
void TimestampColumnPrinter::printRow(uint64_t rowId) {
- const int64_t NANOS_PER_SECOND = 1000000000;
const int64_t NANO_DIGITS = 9;
if (hasNulls && !notNull[rowId]) {
writeString(buffer, "null");
} else {
- int64_t nanos = data[rowId] % NANOS_PER_SECOND;
- time_t seconds =
- static_cast<time_t>(data[rowId] / NANOS_PER_SECOND) + epoch;
- // make sure the nanos are positive
- if (nanos < 0) {
- seconds -= 1;
- nanos = -nanos;
- }
+ int64_t nanos = nanoseconds[rowId];
+ time_t secs = static_cast<time_t>(seconds[rowId]);
struct tm tmValue;
- localtime_r(&seconds, &tmValue);
+ localtime_r(&secs, &tmValue);
char timeBuffer[20];
strftime(timeBuffer, sizeof(timeBuffer), "%Y-%m-%d %H:%M:%S", &tmValue);
writeChar(buffer, '"');
@@ -720,6 +705,9 @@ namespace orc {
void TimestampColumnPrinter::reset(const ColumnVectorBatch& batch) {
ColumnPrinter::reset(batch);
- data = dynamic_cast<const LongVectorBatch&>(batch).data.data();
+ const TimestampVectorBatch& ts =
+ dynamic_cast<const TimestampVectorBatch&>(batch);
+ seconds = ts.data.data();
+ nanoseconds = ts.nanoseconds.data();
}
}
http://git-wip-us.apache.org/repos/asf/orc/blob/8b7189bd/c++/src/ColumnReader.cc
----------------------------------------------------------------------
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 78fb15f..7d21ce1 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -261,10 +261,11 @@ namespace orc {
numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : 0);
}
- class TimestampColumnReader: public IntegerColumnReader {
+ class TimestampColumnReader: public ColumnReader {
private:
+ std::unique_ptr<orc::RleDecoder> secondsRle;
std::unique_ptr<orc::RleDecoder> nanoRle;
- DataBuffer<int64_t> nanoBuffer;
+ const int64_t epochOffset;
public:
TimestampColumnReader(const Type& type, StripeStreams& stripe);
@@ -280,10 +281,13 @@ namespace orc {
TimestampColumnReader::TimestampColumnReader(const Type& type,
StripeStreams& stripe
- ): IntegerColumnReader(type,
- stripe),
- nanoBuffer(memoryPool, 1024){
+ ): ColumnReader(type, stripe),
+ epochOffset(stripe.getEpochOffset()) {
RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
+ secondsRle = createRleDecoder(stripe.getStream(columnId,
+ proto::Stream_Kind_DATA,
+ true),
+ true, vers, memoryPool);
nanoRle = createRleDecoder(stripe.getStream(columnId,
proto::Stream_Kind_SECONDARY,
true),
@@ -295,41 +299,37 @@ namespace orc {
}
uint64_t TimestampColumnReader::skip(uint64_t numValues) {
- numValues = IntegerColumnReader::skip(numValues);
+ numValues = ColumnReader::skip(numValues);
+ secondsRle->skip(numValues);
nanoRle->skip(numValues);
return numValues;
}
void TimestampColumnReader::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues,
- char *notNull) {
+ uint64_t numValues,
+ char *notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
- int64_t* pStamp = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
-
- // make sure that nanoBuffer is large enough
- if (numValues > nanoBuffer.size()) {
- nanoBuffer.resize(numValues);
- }
-
- rle->next(pStamp, numValues, notNull);
- nanoRle->next(nanoBuffer.data(), numValues, notNull);
+ TimestampVectorBatch& timestampBatch =
+ dynamic_cast<TimestampVectorBatch&>(rowBatch);
+ int64_t *secsBuffer = timestampBatch.data.data();
+ secondsRle->next(secsBuffer, numValues, notNull);
+ int64_t *nanoBuffer = timestampBatch.nanoseconds.data();
+ nanoRle->next(nanoBuffer, numValues, notNull);
// Construct the values
for(uint64_t i=0; i < numValues; i++) {
if (notNull == nullptr || notNull[i]) {
- int64_t nanosec = nanoBuffer[i] >> 3;
uint64_t zeros = nanoBuffer[i] & 0x7;
+ nanoBuffer[i] >>= 3;
if (zeros != 0) {
for(uint64_t j = 0; j <= zeros; ++j) {
- nanosec *= 10;
+ nanoBuffer[i] *= 10;
}
}
- pStamp[i] = pStamp[i] * 1000000000 + 1420070400000000000;
- if (pStamp[i] >= 0) {
- pStamp[i] += nanosec;
- } else {
- pStamp[i] -= nanosec;
+ secsBuffer[i] += epochOffset;
+ if (secsBuffer[i] < 0 && nanoBuffer[i] != 0) {
+ secsBuffer[i] -= 1;
}
}
}
http://git-wip-us.apache.org/repos/asf/orc/blob/8b7189bd/c++/src/ColumnReader.hh
----------------------------------------------------------------------
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index b90c942..73db911 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -63,6 +63,13 @@ namespace orc {
* Get the memory pool for this reader.
*/
virtual MemoryPool& getMemoryPool() const = 0;
+
+ /**
+ * Get the number of seconds between the ORC epoch and Unix epoch.
+ * ORC epoch is 1 Jan 2015 00:00:00 local.
+ * Unix epoch is 1 Jan 1970 00:00:00 UTC.
+ */
+ virtual int64_t getEpochOffset() const = 0;
};
/**
http://git-wip-us.apache.org/repos/asf/orc/blob/8b7189bd/c++/src/Reader.cc
----------------------------------------------------------------------
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 684b9e4..b7cf00d 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -854,6 +854,8 @@ namespace orc {
class ReaderImpl : public Reader {
private:
+ const int64_t epochOffset;
+
// inputs
std::unique_ptr<InputStream> stream;
ReaderOptions options;
@@ -993,12 +995,27 @@ namespace orc {
}
}
+ int64_t getEpochOffset() {
+ // Build the literal for the ORC epoch
+ // 2015 Jan 1 00:00:00
+ struct tm epoch;
+ epoch.tm_sec = 0;
+ epoch.tm_min = 0;
+ epoch.tm_hour = 0;
+ epoch.tm_mday = 1;
+ epoch.tm_mon = 0;
+ epoch.tm_year = 2015 - 1900;
+ epoch.tm_isdst = 0;
+ return static_cast<int64_t>(mktime(&epoch));
+ }
+
ReaderImpl::ReaderImpl(std::unique_ptr<InputStream> input,
const ReaderOptions& opts,
std::unique_ptr<proto::PostScript> _postscript,
std::unique_ptr<proto::Footer> _footer,
uint64_t _footerStart
- ): stream(std::move(input)),
+ ): epochOffset(getEpochOffset()),
+ stream(std::move(input)),
options(opts),
footerStart(_footerStart),
memoryPool(*opts.getMemoryPool()),
@@ -1334,13 +1351,15 @@ namespace orc {
const uint64_t stripeStart;
InputStream& input;
MemoryPool& memoryPool;
+ const int64_t epochOffset;
public:
StripeStreamsImpl(const ReaderImpl& reader,
const proto::StripeFooter& footer,
uint64_t stripeStart,
InputStream& input,
- MemoryPool& memoryPool);
+ MemoryPool& memoryPool,
+ int64_t epochOffset);
virtual ~StripeStreamsImpl();
@@ -1356,18 +1375,22 @@ namespace orc {
bool shouldStream) const override;
MemoryPool& getMemoryPool() const override;
+
+ int64_t getEpochOffset() const override;
};
StripeStreamsImpl::StripeStreamsImpl(const ReaderImpl& _reader,
const proto::StripeFooter& _footer,
uint64_t _stripeStart,
InputStream& _input,
- MemoryPool& _memoryPool
+ MemoryPool& _memoryPool,
+ int64_t _epochOffset
): reader(_reader),
footer(_footer),
stripeStart(_stripeStart),
input(_input),
- memoryPool(_memoryPool) {
+ memoryPool(_memoryPool),
+ epochOffset(_epochOffset) {
// PASS
}
@@ -1383,10 +1406,15 @@ namespace orc {
return reader.getSelectedColumns();
}
- proto::ColumnEncoding StripeStreamsImpl::getEncoding(int64_t columnId) const {
+ proto::ColumnEncoding StripeStreamsImpl::getEncoding(int64_t columnId
+ ) const {
return footer.columns(static_cast<int>(columnId));
}
+ int64_t StripeStreamsImpl::getEpochOffset() const {
+ return epochOffset;
+ }
+
std::unique_ptr<SeekableInputStream>
StripeStreamsImpl::getStream(int64_t columnId,
proto::Stream_Kind kind,
@@ -1426,7 +1454,8 @@ namespace orc {
StripeStreamsImpl stripeStreams(*this, currentStripeFooter,
currentStripeInfo.offset(),
*(stream.get()),
- memoryPool);
+ memoryPool,
+ epochOffset);
reader = buildReader(*(schema.get()), stripeStreams);
}
@@ -1479,7 +1508,6 @@ namespace orc {
case SHORT:
case INT:
case LONG:
- case TIMESTAMP:
case DATE:
result = new LongVectorBatch(capacity, memoryPool);
break;
@@ -1493,6 +1521,9 @@ namespace orc {
case VARCHAR:
result = new StringVectorBatch(capacity, memoryPool);
break;
+ case TIMESTAMP:
+ result = new TimestampVectorBatch(capacity, memoryPool);
+ break;
case STRUCT:
result = new StructVectorBatch(capacity, memoryPool);
for(uint64_t i=0; i < type.getSubtypeCount(); ++i) {
http://git-wip-us.apache.org/repos/asf/orc/blob/8b7189bd/c++/src/Vector.cc
----------------------------------------------------------------------
diff --git a/c++/src/Vector.cc b/c++/src/Vector.cc
index e65c69b..55167f4 100644
--- a/c++/src/Vector.cc
+++ b/c++/src/Vector.cc
@@ -303,4 +303,32 @@ namespace orc {
std::string Decimal::toString() const {
return value.toDecimalString(scale);
}
+
+ TimestampVectorBatch::TimestampVectorBatch(uint64_t capacity,
+ MemoryPool& pool
+ ): ColumnVectorBatch(capacity,
+ pool),
+ data(pool, capacity),
+ nanoseconds(pool, capacity) {
+ // PASS
+ }
+
+ TimestampVectorBatch::~TimestampVectorBatch() {
+ // PASS
+ }
+
+ std::string TimestampVectorBatch::toString() const {
+ std::ostringstream buffer;
+ buffer << "Timestamp vector <" << numElements << " of " << capacity << ">";
+ return buffer.str();
+ }
+
+ void TimestampVectorBatch::resize(uint64_t cap) {
+ if (capacity < cap) {
+ ColumnVectorBatch::resize(cap);
+ data.resize(cap);
+ nanoseconds.resize(cap);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/orc/blob/8b7189bd/c++/test/TestColumnPrinter.cc
----------------------------------------------------------------------
diff --git a/c++/test/TestColumnPrinter.cc b/c++/test/TestColumnPrinter.cc
index a25bf3b..a2afdb3 100644
--- a/c++/test/TestColumnPrinter.cc
+++ b/c++/test/TestColumnPrinter.cc
@@ -128,21 +128,33 @@ namespace orc {
std::string line;
std::unique_ptr<Type> type = createPrimitiveType(TIMESTAMP);
std::unique_ptr<ColumnPrinter> printer = createColumnPrinter(line, *type);
- LongVectorBatch batch(1024, *getDefaultPool());
+ TimestampVectorBatch batch(1024, *getDefaultPool());
batch.numElements = 12;
batch.hasNulls = false;
- batch.data[0] = 1420070400000000000;
- batch.data[1] = 963270000000000000;
- batch.data[2] = 1426168859000000000;
- batch.data[3] = 1426168859000000001;
- batch.data[4] = 1426168859000000010;
- batch.data[5] = 1426168859000000100;
- batch.data[6] = 1426168859000001000;
- batch.data[7] = 1426168859000010000;
- batch.data[8] = 1426168859000100000;
- batch.data[9] = 1426168859001000000;
- batch.data[10] = 1426168859010000000;
- batch.data[11] = 1426168859100000000;
+ batch.data[0] = 1420099200;
+ batch.data[1] = 963298800;
+ batch.data[2] = 1426197659;
+ batch.data[3] = 1426197659;
+ batch.data[4] = 1426197659;
+ batch.data[5] = 1426197659;
+ batch.data[6] = 1426197659;
+ batch.data[7] = 1426197659;
+ batch.data[8] = 1426197659;
+ batch.data[9] = 1426197659;
+ batch.data[10] = 1426197659;
+ batch.data[11] = 1426197659;
+ batch.nanoseconds[0] = 0;
+ batch.nanoseconds[1] = 0;
+ batch.nanoseconds[2] = 0;
+ batch.nanoseconds[3] = 1;
+ batch.nanoseconds[4] = 10;
+ batch.nanoseconds[5] = 100;
+ batch.nanoseconds[6] = 1000;
+ batch.nanoseconds[7] = 10000;
+ batch.nanoseconds[8] = 100000;
+ batch.nanoseconds[9] = 1000000;
+ batch.nanoseconds[10] = 10000000;
+ batch.nanoseconds[11] = 100000000;
const char *expected[] = {"\"2015-01-01 00:00:00.0\"",
"\"2000-07-11 00:00:00.0\"",
"\"2015-03-12 15:00:59.0\"",
http://git-wip-us.apache.org/repos/asf/orc/blob/8b7189bd/c++/test/TestColumnReader.cc
----------------------------------------------------------------------
diff --git a/c++/test/TestColumnReader.cc b/c++/test/TestColumnReader.cc
index 4c669d0..41231bc 100644
--- a/c++/test/TestColumnReader.cc
+++ b/c++/test/TestColumnReader.cc
@@ -48,6 +48,11 @@ public:
MemoryPool& getMemoryPool() const {
return *getDefaultPool();
}
+
+ // the epoch offset for America/Los_Angeles
+ int64_t getEpochOffset() const {
+ return 1420099200;
+ }
};
MockStripeStreams::~MockStripeStreams() {
@@ -2634,17 +2639,19 @@ TEST(TestColumnReader, testTimestampSkipWithNulls) {
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
- LongVectorBatch *longBatch = new LongVectorBatch(1024, *getDefaultPool());
+ TimestampVectorBatch *longBatch =
+ new TimestampVectorBatch(1024, *getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(longBatch);
- // Test values are nanoseconds since 1970-01-01 00:00:00.0
- int64_t test_vals[] = {
- 1368178850110000000, // 2013-05-10 10:40:50.11
- 1402483311120000000, // 2014-06-11 11:41:51.12
- 1436701372130000000, // 2015-07-12 12:42:52.13
- 1471092233140000000 // 2016-08-13 13:43:53.14
- };
+ const char *(expected[]) = {"Fri May 10 10:40:50 2013\n",
+ "Wed Jun 11 11:41:51 2014\n",
+ "Sun Jul 12 12:42:52 2015\n",
+ "Sat Aug 13 13:43:53 2016\n"};
+ int64_t expected_nano[] = {110000000,
+ 120000000,
+ 130000000,
+ 140000000};
int vals_ix = 0;
reader->next(batch, 3, 0);
@@ -2658,7 +2665,9 @@ TEST(TestColumnReader, testTimestampSkipWithNulls) {
EXPECT_EQ(0, longBatch->notNull[i]);
} else {
EXPECT_EQ(1, longBatch->notNull[i]);
- EXPECT_EQ(test_vals[vals_ix], longBatch->data[i]);
+ time_t time = static_cast<time_t>(longBatch->data[i]);
+ EXPECT_STREQ(expected[vals_ix], ctime(&time));
+ EXPECT_EQ(expected_nano[vals_ix], longBatch->nanoseconds[i]);
vals_ix++;
}
}
@@ -2675,7 +2684,9 @@ TEST(TestColumnReader, testTimestampSkipWithNulls) {
EXPECT_EQ(0, longBatch->notNull[i]);
} else {
EXPECT_EQ(1, longBatch->notNull[i]);
- EXPECT_EQ(test_vals[vals_ix], longBatch->data[i]);
+ time_t time = static_cast<time_t>(longBatch->data[i]);
+ EXPECT_STREQ(expected[vals_ix], ctime(&time));
+ EXPECT_EQ(expected_nano[vals_ix], longBatch->nanoseconds[i]);
vals_ix++;
}
}
@@ -2739,22 +2750,31 @@ TEST(TestColumnReader, testTimestamp) {
std::unique_ptr<ColumnReader> reader =
buildReader(*rowType, streams);
- LongVectorBatch *longBatch = new LongVectorBatch(1024, *getDefaultPool());
+ TimestampVectorBatch *longBatch =
+ new TimestampVectorBatch(1024, *getDefaultPool());
StructVectorBatch batch(1024, *getDefaultPool());
batch.fields.push_back(longBatch);
- // Test values are nanoseconds since 1970-01-01 00:00:00.0
- const int64_t expected[] = {952873200000000000, // 2000-03-12 15:00:00.0
- 953553600123456789, // 2000-03-20 12:00:00.123456789
- -2208988800000000000, // 1900-01-01 00:00:00.0
- -2198229903190000000, // 1900-05-05 12:34:56.19
- -2166693903190100000, // 1901-05-05 12:34:56.1901
- -2135157903190200000, // 1902-05-05 12:34:56.1902
- -2103621903190300000, // 1903-05-05 12:34:56.1903
- -2071999503190400000, // 1904-05-05 12:34:56.1904
- -2040463503190500000, // 1905-05-05 12:34:56.1905
- -1882697103191000000 // 1910-05-05 12:34:56.191
- };
+ const char *(expected[]) = {"Sun Mar 12 15:00:00 2000\n",
+ "Mon Mar 20 12:00:00 2000\n",
+ "Mon Jan 1 00:00:00 1900\n",
+ "Sat May 5 12:34:56 1900\n",
+ "Sun May 5 12:34:56 1901\n",
+ "Mon May 5 12:34:56 1902\n",
+ "Tue May 5 12:34:56 1903\n",
+ "Thu May 5 12:34:56 1904\n",
+ "Fri May 5 12:34:56 1905\n",
+ "Thu May 5 12:34:56 1910\n"};
+ const int64_t expectedNano[] = {0,
+ 123456789,
+ 0,
+ 190000000,
+ 190100000,
+ 190200000,
+ 190300000,
+ 190400000,
+ 190500000,
+ 191000000};
reader->next(batch, 10, 0);
ASSERT_EQ(10, batch.numElements);
@@ -2763,7 +2783,9 @@ TEST(TestColumnReader, testTimestamp) {
ASSERT_EQ(true, !longBatch->hasNulls);
for (size_t i = 0; i < batch.numElements; ++i) {
- EXPECT_EQ(expected[i], longBatch->data[i]) << "Wrong value at " << i;
+ time_t time = static_cast<time_t>(longBatch->data[i]);
+ EXPECT_STREQ(expected[i], ctime(&time)) << "Wrong value at " << i;
+ EXPECT_EQ(expectedNano[i], longBatch->nanoseconds[i]);
}
}