You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2018/05/07 17:04:03 UTC
orc git commit: ORC-341: Support time zone as a parameter for Java
reader and writer
Repository: orc
Updated Branches:
refs/heads/master c4c7b28f6 -> aa790d4e9
ORC-341: Support time zone as a parameter for Java reader and writer
Fixes #249
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/aa790d4e
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/aa790d4e
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/aa790d4e
Branch: refs/heads/master
Commit: aa790d4e9355a0f93941b3d6c0e8ea12e8a8ea5f
Parents: c4c7b28
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Wed May 2 14:26:46 2018 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon May 7 10:03:28 2018 -0700
----------------------------------------------------------------------
java/core/src/java/org/apache/orc/OrcFile.java | 25 ++++
.../apache/orc/TimestampColumnStatistics.java | 12 ++
.../apache/orc/impl/ColumnStatisticsImpl.java | 14 ++
.../java/org/apache/orc/impl/ReaderImpl.java | 2 +
.../org/apache/orc/impl/RecordReaderImpl.java | 139 ++++++++++++++++---
.../org/apache/orc/impl/TreeReaderFactory.java | 22 ++-
.../java/org/apache/orc/impl/WriterImpl.java | 14 +-
.../orc/impl/writer/TimestampTreeWriter.java | 40 +++++-
.../apache/orc/impl/writer/WriterContext.java | 2 +
.../apache/orc/impl/writer/WriterImplV2.java | 14 +-
.../test/org/apache/orc/TestOrcTimezone4.java | 113 +++++++++++++++
.../apache/orc/impl/TestRecordReaderImpl.java | 74 +++++++---
12 files changed, 418 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/aa790d4e/java/core/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java
index ac0beff..b07355a 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -276,6 +276,7 @@ public class OrcFile {
// and remove this class altogether. Both footer caching and llap caching just needs OrcTail.
// For now keeping this around to avoid complex surgery
private FileMetadata fileMetadata;
+ private boolean useUTCTimestamp;
public ReaderOptions(Configuration conf) {
this.conf = conf;
@@ -320,6 +321,16 @@ public class OrcFile {
public FileMetadata getFileMetadata() {
return fileMetadata;
}
+
+ public ReaderOptions useUTCTimestamp(boolean value) {
+ useUTCTimestamp = value;
+ return this;
+ }
+
+ public boolean getUseUTCTimestamp() {
+ return useUTCTimestamp;
+ }
+
}
public static ReaderOptions readerOptions(Configuration conf) {
@@ -392,6 +403,7 @@ public class OrcFile {
private BloomFilterVersion bloomFilterVersion;
private PhysicalWriter physicalWriter;
private WriterVersion writerVersion = CURRENT_WRITER;
+ private boolean useUTCTimestamp;
private boolean overwrite;
private boolean writeVariableLengthBlocks;
private HadoopShims shims;
@@ -666,6 +678,15 @@ public class OrcFile {
return this;
}
+ /**
+ * Manually set the time zone for the writer to utc.
+ * If not defined, system time zone is assumed.
+ */
+ public WriterOptions useUTCTimestamp(boolean value) {
+ useUTCTimestamp = value;
+ return this;
+ }
+
public boolean getBlockPadding() {
return blockPaddingValue;
}
@@ -761,6 +782,10 @@ public class OrcFile {
public HadoopShims getHadoopShims() {
return shims;
}
+
+ public boolean getUseUTCTimestamp() {
+ return useUTCTimestamp;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/orc/blob/aa790d4e/java/core/src/java/org/apache/orc/TimestampColumnStatistics.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/TimestampColumnStatistics.java b/java/core/src/java/org/apache/orc/TimestampColumnStatistics.java
index 27dc49f..b095a68 100644
--- a/java/core/src/java/org/apache/orc/TimestampColumnStatistics.java
+++ b/java/core/src/java/org/apache/orc/TimestampColumnStatistics.java
@@ -35,4 +35,16 @@ public interface TimestampColumnStatistics extends ColumnStatistics {
* @return maximum value
*/
Timestamp getMaximum();
+
+ /**
+ * Get the minimum value for the column in UTC.
+ * @return minimum value in UTC
+ */
+ Timestamp getMinimumUTC();
+
+ /**
+ * Get the maximum value for the column in UTC.
+ * @return maximum value in UTC
+ */
+ Timestamp getMaximumUTC();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/aa790d4e/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java b/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java
index 98a4e17..22e1842 100644
--- a/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java
@@ -1430,6 +1430,16 @@ public class ColumnStatisticsImpl implements ColumnStatistics {
}
@Override
+ public Timestamp getMinimumUTC() {
+ return minimum == null ? null : new Timestamp(minimum);
+ }
+
+ @Override
+ public Timestamp getMaximumUTC() {
+ return maximum == null ? null : new Timestamp(maximum);
+ }
+
+ @Override
public String toString() {
StringBuilder buf = new StringBuilder(super.toString());
if (minimum != null || maximum != null) {
@@ -1437,6 +1447,10 @@ public class ColumnStatisticsImpl implements ColumnStatistics {
buf.append(getMinimum());
buf.append(" max: ");
buf.append(getMaximum());
+ buf.append(" min UTC: ");
+ buf.append(getMinimumUTC());
+ buf.append(" max UTC: ");
+ buf.append(getMaximumUTC());
}
return buf.toString();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/aa790d4e/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index feb8160..ed597d5 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -77,6 +77,7 @@ public class ReaderImpl implements Reader {
private long deserializedSize = -1;
protected final Configuration conf;
+ protected final boolean useUTCTimestamp;
private final List<Integer> versionList;
private final OrcFile.WriterVersion writerVersion;
@@ -344,6 +345,7 @@ public class ReaderImpl implements Reader {
this.path = path;
this.conf = options.getConfiguration();
this.maxLength = options.getMaxLength();
+ this.useUTCTimestamp = options.getUseUTCTimestamp();
FileMetadata fileMetadata = options.getFileMetadata();
if (fileMetadata != null) {
this.compressionKind = fileMetadata.getCompressionKind();
http://git-wip-us.apache.org/repos/asf/orc/blob/aa790d4e/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index abb487e..8bb4d9a 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -210,12 +210,14 @@ public class RecordReaderImpl implements RecordReader {
SearchArgument sarg = options.getSearchArgument();
if (sarg != null && rowIndexStride != 0) {
sargApp = new SargApplier(sarg,
- rowIndexStride,
- evolution,
- writerVersion);
+ rowIndexStride,
+ evolution,
+ writerVersion,
+ fileReader.useUTCTimestamp);
} else {
sargApp = null;
}
+
long rows = 0;
long skippedRows = 0;
long offset = options.getOffset();
@@ -259,7 +261,8 @@ public class RecordReaderImpl implements RecordReader {
new TreeReaderFactory.ReaderContext()
.setSchemaEvolution(evolution)
.skipCorrupt(skipCorrupt)
- .fileFormat(fileReader.getFileVersion());
+ .fileFormat(fileReader.getFileVersion())
+ .useUTCTimestamp(fileReader.useUTCTimestamp);
reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(),
readerContext);
@@ -330,6 +333,19 @@ public class RecordReaderImpl implements RecordReader {
* @return the object for the maximum value or null if there isn't one
*/
static Object getMax(ColumnStatistics index) {
+ return getMax(index, false);
+ }
+
+ /**
+ * Get the maximum value out of an index entry.
+ * Includes option to specify if timestamp column stats values
+ * should be in UTC.
+ * @param index
+ * the index entry
+ * @param useUTCTimestamp
+ * @return the object for the maximum value or null if there isn't one
+ */
+ static Object getMax(ColumnStatistics index, boolean useUTCTimestamp) {
if (index instanceof IntegerColumnStatistics) {
return ((IntegerColumnStatistics) index).getMaximum();
} else if (index instanceof DoubleColumnStatistics) {
@@ -341,7 +357,11 @@ public class RecordReaderImpl implements RecordReader {
} else if (index instanceof DecimalColumnStatistics) {
return ((DecimalColumnStatistics) index).getMaximum();
} else if (index instanceof TimestampColumnStatistics) {
- return ((TimestampColumnStatistics) index).getMaximum();
+ if (useUTCTimestamp) {
+ return ((TimestampColumnStatistics) index).getMaximumUTC();
+ } else {
+ return ((TimestampColumnStatistics) index).getMaximum();
+ }
} else if (index instanceof BooleanColumnStatistics) {
if (((BooleanColumnStatistics)index).getTrueCount()!=0) {
return Boolean.TRUE;
@@ -360,6 +380,19 @@ public class RecordReaderImpl implements RecordReader {
* @return the object for the minimum value or null if there isn't one
*/
static Object getMin(ColumnStatistics index) {
+ return getMin(index, false);
+ }
+
+ /**
+ * Get the minimum value out of an index entry.
+ * Includes option to specify if timestamp column stats values
+ * should be in UTC.
+ * @param index
+ * the index entry
+ * @param useUTCTimestamp
+ * @return the object for the minimum value or null if there isn't one
+ */
+ static Object getMin(ColumnStatistics index, boolean useUTCTimestamp) {
if (index instanceof IntegerColumnStatistics) {
return ((IntegerColumnStatistics) index).getMinimum();
} else if (index instanceof DoubleColumnStatistics) {
@@ -371,7 +404,11 @@ public class RecordReaderImpl implements RecordReader {
} else if (index instanceof DecimalColumnStatistics) {
return ((DecimalColumnStatistics) index).getMinimum();
} else if (index instanceof TimestampColumnStatistics) {
- return ((TimestampColumnStatistics) index).getMinimum();
+ if (useUTCTimestamp) {
+ return ((TimestampColumnStatistics) index).getMinimumUTC();
+ } else {
+ return ((TimestampColumnStatistics) index).getMinimum();
+ }
} else if (index instanceof BooleanColumnStatistics) {
if (((BooleanColumnStatistics)index).getFalseCount()!=0) {
return Boolean.FALSE;
@@ -401,9 +438,35 @@ public class RecordReaderImpl implements RecordReader {
OrcProto.BloomFilter bloomFilter,
OrcFile.WriterVersion writerVersion,
TypeDescription.Category type) {
+ return evaluatePredicateProto(statsProto, predicate, kind, encoding, bloomFilter,
+ writerVersion, type, false);
+ }
+
+ /**
+ * Evaluate a predicate with respect to the statistics from the column
+ * that is referenced in the predicate.
+ * Includes option to specify if timestamp column stats values
+ * should be in UTC.
+ * @param statsProto the statistics for the column mentioned in the predicate
+ * @param predicate the leaf predicate we need to evaluation
+ * @param bloomFilter the bloom filter
+ * @param writerVersion the version of software that wrote the file
+ * @param type what is the kind of this column
+ * @param useUTCTimestamp
+ * @return the set of truth values that may be returned for the given
+ * predicate.
+ */
+ static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
+ PredicateLeaf predicate,
+ OrcProto.Stream.Kind kind,
+ OrcProto.ColumnEncoding encoding,
+ OrcProto.BloomFilter bloomFilter,
+ OrcFile.WriterVersion writerVersion,
+ TypeDescription.Category type,
+ boolean useUTCTimestamp) {
ColumnStatistics cs = ColumnStatisticsImpl.deserialize(null, statsProto);
- Object minValue = getMin(cs);
- Object maxValue = getMax(cs);
+ Object minValue = getMin(cs, useUTCTimestamp);
+ Object maxValue = getMax(cs, useUTCTimestamp);
// files written before ORC-135 stores timestamp wrt to local timezone causing issues with PPD.
// disable PPD for timestamp for all old files
if (type.equals(TypeDescription.Category.TIMESTAMP)) {
@@ -420,7 +483,8 @@ public class RecordReaderImpl implements RecordReader {
}
}
return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(),
- BloomFilterIO.deserialize(kind, encoding, writerVersion, type, bloomFilter));
+ BloomFilterIO.deserialize(kind, encoding, writerVersion, type, bloomFilter),
+ useUTCTimestamp);
}
/**
@@ -434,13 +498,32 @@ public class RecordReaderImpl implements RecordReader {
public static TruthValue evaluatePredicate(ColumnStatistics stats,
PredicateLeaf predicate,
BloomFilter bloomFilter) {
- Object minValue = getMin(stats);
- Object maxValue = getMax(stats);
- return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter);
+ return evaluatePredicate(stats, predicate, bloomFilter, false);
+ }
+
+ /**
+ * Evaluate a predicate with respect to the statistics from the column
+ * that is referenced in the predicate.
+ * Includes option to specify if timestamp column stats values
+ * should be in UTC.
+ * @param stats the statistics for the column mentioned in the predicate
+ * @param predicate the leaf predicate we need to evaluation
+ * @param bloomFilter
+ * @param useUTCTimestamp
+ * @return the set of truth values that may be returned for the given
+ * predicate.
+ */
+ public static TruthValue evaluatePredicate(ColumnStatistics stats,
+ PredicateLeaf predicate,
+ BloomFilter bloomFilter,
+ boolean useUTCTimestamp) {
+ Object minValue = getMin(stats, useUTCTimestamp);
+ Object maxValue = getMax(stats, useUTCTimestamp);
+ return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter, useUTCTimestamp);
}
static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
- Object max, boolean hasNull, BloomFilter bloomFilter) {
+ Object max, boolean hasNull, BloomFilter bloomFilter, boolean useUTCTimestamp) {
// if we didn't have any values, everything must have been null
if (min == null) {
if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
@@ -461,7 +544,7 @@ public class RecordReaderImpl implements RecordReader {
result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull);
if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) {
- return evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull);
+ return evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull, useUTCTimestamp);
} else {
return result;
}
@@ -580,18 +663,18 @@ public class RecordReaderImpl implements RecordReader {
}
private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate,
- final Object predObj, BloomFilter bloomFilter, boolean hasNull) {
+ final Object predObj, BloomFilter bloomFilter, boolean hasNull, boolean useUTCTimestamp) {
switch (predicate.getOperator()) {
case NULL_SAFE_EQUALS:
// null safe equals does not return *_NULL variant. So set hasNull to false
- return checkInBloomFilter(bloomFilter, predObj, false);
+ return checkInBloomFilter(bloomFilter, predObj, false, useUTCTimestamp);
case EQUALS:
- return checkInBloomFilter(bloomFilter, predObj, hasNull);
+ return checkInBloomFilter(bloomFilter, predObj, hasNull, useUTCTimestamp);
case IN:
for (Object arg : predicate.getLiteralList()) {
// if atleast one value in IN list exist in bloom filter, qualify the row group/stripe
Object predObjItem = getBaseObjectForComparison(predicate.getType(), arg);
- TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull);
+ TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull, useUTCTimestamp);
if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) {
return result;
}
@@ -602,7 +685,7 @@ public class RecordReaderImpl implements RecordReader {
}
}
- private static TruthValue checkInBloomFilter(BloomFilter bf, Object predObj, boolean hasNull) {
+ private static TruthValue checkInBloomFilter(BloomFilter bf, Object predObj, boolean hasNull, boolean useUTCTimestamp) {
TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
if (predObj instanceof Long) {
@@ -620,8 +703,14 @@ public class RecordReaderImpl implements RecordReader {
result = TruthValue.YES_NO_NULL;
}
} else if (predObj instanceof Timestamp) {
- if (bf.testLong(SerializationUtils.convertToUtc(TimeZone.getDefault(), ((Timestamp) predObj).getTime()))) {
- result = TruthValue.YES_NO_NULL;
+ if (useUTCTimestamp) {
+ if (bf.testLong(((Timestamp) predObj).getTime())) {
+ result = TruthValue.YES_NO_NULL;
+ }
+ } else {
+ if (bf.testLong(SerializationUtils.convertToUtc(TimeZone.getDefault(), ((Timestamp) predObj).getTime()))) {
+ result = TruthValue.YES_NO_NULL;
+ }
}
} else if (predObj instanceof Date) {
if (bf.testLong(DateWritable.dateToDays((Date) predObj))) {
@@ -773,11 +862,13 @@ public class RecordReaderImpl implements RecordReader {
private final boolean[] sargColumns;
private SchemaEvolution evolution;
private final long[] exceptionCount;
+ private final boolean useUTCTimestamp;
public SargApplier(SearchArgument sarg,
long rowIndexStride,
SchemaEvolution evolution,
- OrcFile.WriterVersion writerVersion) {
+ OrcFile.WriterVersion writerVersion,
+ boolean useUTCTimestamp) {
this.writerVersion = writerVersion;
this.sarg = sarg;
sargLeaves = sarg.getLeaves();
@@ -796,6 +887,7 @@ public class RecordReaderImpl implements RecordReader {
}
this.evolution = evolution;
exceptionCount = new long[sargLeaves.size()];
+ this.useUTCTimestamp = useUTCTimestamp;
}
/**
@@ -847,7 +939,8 @@ public class RecordReaderImpl implements RecordReader {
leafValues[pred] = evaluatePredicateProto(stats,
predicate, bfk, encodings.get(columnIx), bf,
writerVersion, evolution.getFileSchema().
- findSubtype(columnIx).getCategory());
+ findSubtype(columnIx).getCategory(),
+ useUTCTimestamp);
} catch (Exception e) {
exceptionCount[pred] += 1;
if (e instanceof SargCastException) {
http://git-wip-us.apache.org/repos/asf/orc/blob/aa790d4e/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index ccae522..b0fd5a7 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -19,7 +19,6 @@ package org.apache.orc.impl;
import java.io.EOFException;
import java.io.IOException;
-import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -57,6 +56,8 @@ public class TreeReaderFactory {
boolean isSkipCorrupt();
+ boolean getUseUTCTimestamp();
+
String getWriterTimezone();
OrcFile.Version getFileFormat();
@@ -65,6 +66,7 @@ public class TreeReaderFactory {
public static class ReaderContext implements Context {
private SchemaEvolution evolution;
private boolean skipCorrupt = false;
+ private boolean useUTCTimestamp = false;
private String writerTimezone;
private OrcFile.Version fileFormat;
@@ -78,6 +80,11 @@ public class TreeReaderFactory {
return this;
}
+ public ReaderContext useUTCTimestamp(boolean useUTCTimestamp) {
+ this.useUTCTimestamp = useUTCTimestamp;
+ return this;
+ }
+
public ReaderContext writerTimeZone(String writerTimezone) {
this.writerTimezone = writerTimezone;
return this;
@@ -99,6 +106,11 @@ public class TreeReaderFactory {
}
@Override
+ public boolean getUseUTCTimestamp() {
+ return useUTCTimestamp;
+ }
+
+ @Override
public String getWriterTimezone() {
return writerTimezone;
}
@@ -900,7 +912,11 @@ public class TreeReaderFactory {
this.threadLocalDateFormat = new ThreadLocal<>();
this.threadLocalDateFormat.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
this.baseTimestampMap = new HashMap<>();
- this.readerTimeZone = TimeZone.getDefault();
+ if (context.getUseUTCTimestamp()) {
+ this.readerTimeZone = TimeZone.getTimeZone("UTC");
+ } else {
+ this.readerTimeZone = TimeZone.getDefault();
+ }
if (context.getWriterTimezone() == null || context.getWriterTimezone().isEmpty()) {
this.writerTimeZone = readerTimeZone;
} else {
@@ -990,6 +1006,8 @@ public class TreeReaderFactory {
TimestampColumnVector result = (TimestampColumnVector) previousVector;
super.nextVector(previousVector, isNull, batchSize);
+ result.setIsUTC(context.getUseUTCTimestamp());
+
for (int i = 0; i < batchSize; i++) {
if (result.noNulls || !result.isNull[i]) {
final int newNanos = parseNanos(nanos.next());
http://git-wip-us.apache.org/repos/asf/orc/blob/aa790d4e/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 7a4a0b9..d6239f2 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -112,6 +112,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
private final double bloomFilterFpp;
private final OrcFile.BloomFilterVersion bloomFilterVersion;
private final boolean writeTimeZone;
+ private final boolean useUTCTimeZone;
public WriterImpl(FileSystem fs,
Path path,
@@ -133,7 +134,8 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
} else {
callbackContext = null;
}
- writeTimeZone = hasTimestamp(schema);
+ this.writeTimeZone = hasTimestamp(schema);
+ this.useUTCTimeZone = opts.getUseUTCTimestamp();
this.adjustedStripeSize = opts.getStripeSize();
this.version = opts.getVersion();
this.encodingStrategy = opts.getEncodingStrategy();
@@ -404,6 +406,10 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
physicalWriter.writeBloomFilter(name, bloom,
getCustomizedCodec(name.getKind()));
}
+
+ public boolean getUseUTCTimestamp() {
+ return useUTCTimeZone;
+ }
}
@@ -431,7 +437,11 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
OrcProto.StripeFooter.Builder builder =
OrcProto.StripeFooter.newBuilder();
if (writeTimeZone) {
- builder.setWriterTimezone(TimeZone.getDefault().getID());
+ if (useUTCTimeZone) {
+ builder.setWriterTimezone("UTC");
+ } else {
+ builder.setWriterTimezone(TimeZone.getDefault().getID());
+ }
}
OrcProto.StripeStatistics.Builder stats =
OrcProto.StripeStatistics.newBuilder();
http://git-wip-us.apache.org/repos/asf/orc/blob/aa790d4e/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
index a7bfc90..0f30d07 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
@@ -28,7 +28,9 @@ import org.apache.orc.impl.PositionRecorder;
import org.apache.orc.impl.SerializationUtils;
import java.io.IOException;
-import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.TimeZone;
public class TimestampTreeWriter extends TreeWriterBase {
@@ -38,8 +40,10 @@ public class TimestampTreeWriter extends TreeWriterBase {
private final IntegerWriter seconds;
private final IntegerWriter nanos;
private final boolean isDirectV2;
+ private boolean useUTCTimestamp;
private final TimeZone localTimezone;
private final long baseEpochSecsLocalTz;
+ private final long baseEpochSecsUTC;
public TimestampTreeWriter(int columnId,
TypeDescription schema,
@@ -54,9 +58,25 @@ public class TimestampTreeWriter extends TreeWriterBase {
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
}
+ this.useUTCTimestamp = writer.getUseUTCTimestamp();
+ DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
this.localTimezone = TimeZone.getDefault();
- // for unit tests to set different time zones
- this.baseEpochSecsLocalTz = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
+ dateFormat.setTimeZone(this.localTimezone);
+ try {
+ this.baseEpochSecsLocalTz = dateFormat
+ .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
+ TimestampTreeWriter.MILLIS_PER_SECOND;
+ } catch (ParseException e) {
+ throw new IOException("Unable to create base timestamp tree writer", e);
+ }
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ try {
+ this.baseEpochSecsUTC = dateFormat
+ .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
+ TimestampTreeWriter.MILLIS_PER_SECOND;
+ } catch (ParseException e) {
+ throw new IOException("Unable to create base timestamp tree writer", e);
+ }
}
@Override
@@ -85,7 +105,8 @@ public class TimestampTreeWriter extends TreeWriterBase {
if (millis < 0 && newNanos > 999_999) {
millis -= MILLIS_PER_SECOND;
}
- long utc = SerializationUtils.convertToUtc(localTimezone, millis);
+ long utc = vec.isUTC() ?
+ millis : SerializationUtils.convertToUtc(localTimezone, millis);
indexStatistics.updateTimestamp(utc);
if (createBloomFilter) {
if (bloomFilter != null) {
@@ -95,7 +116,7 @@ public class TimestampTreeWriter extends TreeWriterBase {
}
final long nano = formatNanos(vec.nanos[0]);
for (int i = 0; i < length; ++i) {
- seconds.write(secs - baseEpochSecsLocalTz);
+ seconds.write(secs - (useUTCTimestamp ? baseEpochSecsUTC : baseEpochSecsLocalTz));
nanos.write(nano);
}
}
@@ -110,8 +131,13 @@ public class TimestampTreeWriter extends TreeWriterBase {
if (millis < 0 && newNanos > 999_999) {
millis -= MILLIS_PER_SECOND;
}
- long utc = SerializationUtils.convertToUtc(localTimezone, millis);
- seconds.write(secs - baseEpochSecsLocalTz);
+ long utc = vec.isUTC() ?
+ millis : SerializationUtils.convertToUtc(localTimezone, millis);
+ if (useUTCTimestamp) {
+ seconds.write(secs - baseEpochSecsUTC);
+ } else {
+ seconds.write(secs - baseEpochSecsLocalTz);
+ }
nanos.write(formatNanos(newNanos));
indexStatistics.updateTimestamp(utc);
if (createBloomFilter) {
http://git-wip-us.apache.org/repos/asf/orc/blob/aa790d4e/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
index e32c683..1c8ca1a 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
@@ -101,4 +101,6 @@ public interface WriterContext {
void writeBloomFilter(StreamName name,
OrcProto.BloomFilterIndex.Builder bloom
) throws IOException;
+
+ boolean getUseUTCTimestamp();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/aa790d4e/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
index 668ecc2..e1f410c 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
@@ -111,6 +111,7 @@ public class WriterImplV2 implements WriterInternal, MemoryManager.Callback {
private final double bloomFilterFpp;
private final OrcFile.BloomFilterVersion bloomFilterVersion;
private final boolean writeTimeZone;
+ private final boolean useUTCTimeZone;
public WriterImplV2(FileSystem fs,
Path path,
@@ -132,7 +133,8 @@ public class WriterImplV2 implements WriterInternal, MemoryManager.Callback {
} else {
callbackContext = null;
}
- writeTimeZone = hasTimestamp(schema);
+ this.writeTimeZone = hasTimestamp(schema);
+ this.useUTCTimeZone = opts.getUseUTCTimestamp();
this.adjustedStripeSize = opts.getStripeSize();
this.version = opts.getVersion();
this.encodingStrategy = opts.getEncodingStrategy();
@@ -346,6 +348,10 @@ public class WriterImplV2 implements WriterInternal, MemoryManager.Callback {
physicalWriter.writeBloomFilter(name, bloom,
getCustomizedCodec(name.getKind()));
}
+
+ public boolean getUseUTCTimestamp() {
+ return useUTCTimeZone;
+ }
}
@@ -373,7 +379,11 @@ public class WriterImplV2 implements WriterInternal, MemoryManager.Callback {
OrcProto.StripeFooter.Builder builder =
OrcProto.StripeFooter.newBuilder();
if (writeTimeZone) {
- builder.setWriterTimezone(TimeZone.getDefault().getID());
+ if (useUTCTimeZone) {
+ builder.setWriterTimezone("UTC");
+ } else {
+ builder.setWriterTimezone(TimeZone.getDefault().getID());
+ }
}
OrcProto.StripeStatistics.Builder stats =
OrcProto.StripeStatistics.newBuilder();
http://git-wip-us.apache.org/repos/asf/orc/blob/aa790d4e/java/core/src/test/org/apache/orc/TestOrcTimezone4.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestOrcTimezone4.java b/java/core/src/test/org/apache/orc/TestOrcTimezone4.java
new file mode 100644
index 0000000..d23f804
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/TestOrcTimezone4.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.TimeZone;
+
+import static junit.framework.Assert.assertEquals;
+
+/**
+ *
+ */
+public class TestOrcTimezone4 {
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ Configuration conf;
+ FileSystem fs;
+ Path testFilePath;
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ static TimeZone defaultTimeZone = TimeZone.getDefault();
+
+ public TestOrcTimezone4() {
+ }
+
+ @Rule
+ public TestName testCaseName = new TestName();
+
+ @Before
+ public void openFileSystem() throws Exception {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+ testFilePath = new Path(workDir, "TestOrcTimezone4." +
+ testCaseName.getMethodName() + ".orc");
+ fs.delete(testFilePath, false);
+ }
+
+ @After
+ public void restoreTimeZone() {
+ TimeZone.setDefault(defaultTimeZone);
+ }
+
+ @Test
+ public void testTimestampWriter() throws Exception {
+ TypeDescription schema = TypeDescription.createTimestamp();
+
+ TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"));
+
+ Writer writer = OrcFile.createWriter(testFilePath,
+ OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+ .bufferSize(10000));
+ List<String> ts = Lists.newArrayList();
+ ts.add("1969-12-31 15:59:56.007");
+ ts.add("1969-12-31 16:00:14.007");
+ ts.add("1969-12-31 16:00:06.021");
+ VectorizedRowBatch batch = schema.createRowBatch();
+ TimestampColumnVector times = (TimestampColumnVector) batch.cols[0];
+ for (String t : ts) {
+ long time = formatter.parse(t).getTime();
+ times.set(batch.size++, new Timestamp(time));
+ }
+ writer.addRowBatch(batch);
+ writer.close();
+
+ Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf).filesystem(fs).useUTCTimestamp(true));
+ formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
+ RecordReader rows = reader.rows();
+ batch = reader.getSchema().createRowBatch();
+ times = (TimestampColumnVector) batch.cols[0];
+ int idx = 0;
+ while (rows.nextBatch(batch)) {
+ for(int r=0; r < batch.size; ++r) {
+ Timestamp timestamp = times.asScratchTimestamp(r);
+ assertEquals(ts.get(idx++), formatter.format(timestamp));
+ }
+ }
+ rows.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/aa790d4e/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
index ae8bab5..1cf01de 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
@@ -38,6 +38,7 @@ import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -440,7 +441,8 @@ public class TestRecordReaderImpl {
static TruthValue evaluateTimestamp(OrcProto.ColumnStatistics stats,
PredicateLeaf predicate,
- boolean include135) {
+ boolean include135,
+ boolean useUTCTimestamp) {
OrcProto.ColumnEncoding encoding =
OrcProto.ColumnEncoding.newBuilder()
.setKind(OrcProto.ColumnEncoding.Kind.DIRECT)
@@ -448,13 +450,14 @@ public class TestRecordReaderImpl {
return RecordReaderImpl.evaluatePredicateProto(stats, predicate, null,
encoding, null,
include135 ? OrcFile.WriterVersion.ORC_135: OrcFile.WriterVersion.ORC_101,
- TypeDescription.Category.TIMESTAMP);
+ TypeDescription.Category.TIMESTAMP, useUTCTimestamp);
}
static TruthValue evaluateTimestampBloomfilter(OrcProto.ColumnStatistics stats,
PredicateLeaf predicate,
BloomFilter bloom,
- OrcFile.WriterVersion version) {
+ OrcFile.WriterVersion version,
+ boolean useUTCTimestamp) {
OrcProto.ColumnEncoding.Builder encoding =
OrcProto.ColumnEncoding.newBuilder()
.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
@@ -470,7 +473,7 @@ public class TestRecordReaderImpl {
BloomFilterIO.serialize(builder, bloom);
return RecordReaderImpl.evaluatePredicateProto(stats, predicate, kind,
encoding.build(), builder.build(), version,
- TypeDescription.Category.TIMESTAMP);
+ TypeDescription.Category.TIMESTAMP, useUTCTimestamp);
}
@Test
@@ -756,44 +759,44 @@ public class TestRecordReaderImpl {
"x", Timestamp.valueOf("2017-01-01 00:00:00"), null);
assertEquals(TruthValue.YES_NO,
evaluateTimestamp(createTimestampStats("2017-01-01 00:00:00",
- "2018-01-01 00:00:00"), pred, true));
+ "2018-01-01 00:00:00"), pred, true, false));
pred = createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS,
PredicateLeaf.Type.FLOAT, "x", 15.0, null);
assertEquals(TruthValue.YES_NO_NULL,
evaluateTimestamp(createTimestampStats("2017-01-01 00:00:00", "2018-01-01 00:00:00"),
- pred, true));
+ pred, true, false));
assertEquals(TruthValue.YES_NO_NULL,
evaluateTimestamp(createTimestampStats("2017-01-01 00:00:00", "2018-01-01 00:00:00"),
- pred, true));
+ pred, true, false));
// pre orc-135 should always be yes_no_null.
pred = createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS,
PredicateLeaf.Type.TIMESTAMP, "x", Timestamp.valueOf("2017-01-01 00:00:00"), null);
assertEquals(TruthValue.YES_NO_NULL,
evaluateTimestamp(createTimestampStats("2017-01-01 00:00:00", "2017-01-01 00:00:00"),
- pred, false));
+ pred, false, false));
pred = createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS,
PredicateLeaf.Type.STRING, "x", Timestamp.valueOf("2017-01-01 00:00:00").toString(), null);
assertEquals(TruthValue.YES_NO,
evaluateTimestamp(createTimestampStats("2017-01-01 00:00:00", "2018-01-01 00:00:00"),
- pred, true));
+ pred, true, false));
pred = createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS,
PredicateLeaf.Type.DATE, "x", Date.valueOf("2016-01-01"), null);
assertEquals(TruthValue.NO,
evaluateTimestamp(createTimestampStats("2017-01-01 00:00:00", "2017-01-01 00:00:00"),
- pred, true));
+ pred, true, false));
assertEquals(TruthValue.YES_NO,
evaluateTimestamp(createTimestampStats("2015-01-01 00:00:00", "2016-01-01 00:00:00"),
- pred, true));
+ pred, true, false));
pred = createPredicateLeaf(PredicateLeaf.Operator.NULL_SAFE_EQUALS,
PredicateLeaf.Type.DECIMAL, "x", new HiveDecimalWritable("15"), null);
assertEquals(TruthValue.YES_NO_NULL,
evaluateTimestamp(createTimestampStats("2015-01-01 00:00:00", "2016-01-01 00:00:00"),
- pred, true));
+ pred, true, false));
}
@Test
@@ -1075,13 +1078,50 @@ public class TestRecordReaderImpl {
"x", Timestamp.valueOf("2000-01-01 00:00:00"), null);
OrcProto.ColumnStatistics cs = createTimestampStats("2000-01-01 00:00:00", "2001-01-01 00:00:00");
assertEquals(TruthValue.YES_NO_NULL,
- evaluateTimestampBloomfilter(cs, pred, new BloomFilterUtf8(10000, 0.01), OrcFile.WriterVersion.ORC_101));
+ evaluateTimestampBloomfilter(cs, pred, new BloomFilterUtf8(10000, 0.01), OrcFile.WriterVersion.ORC_101, false));
BloomFilterUtf8 bf = new BloomFilterUtf8(10, 0.05);
bf.addLong(getUtcTimestamp("2000-06-01 00:00:00"));
assertEquals(TruthValue.NO_NULL,
- evaluateTimestampBloomfilter(cs, pred, bf, OrcFile.WriterVersion.ORC_135));
+ evaluateTimestampBloomfilter(cs, pred, bf, OrcFile.WriterVersion.ORC_135, false));
assertEquals(TruthValue.YES_NO_NULL,
- evaluateTimestampBloomfilter(cs, pred, bf, OrcFile.WriterVersion.ORC_101));
+ evaluateTimestampBloomfilter(cs, pred, bf, OrcFile.WriterVersion.ORC_101, false));
+ }
+
+ @Test
+ public void testTimestampUTC() throws Exception {
+ DateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ f.setTimeZone(TimeZone.getTimeZone("UTC"));
+ PredicateLeaf pred = createPredicateLeaf
+ (PredicateLeaf.Operator.EQUALS, PredicateLeaf.Type.TIMESTAMP,
+ "x", new Timestamp(f.parse("2015-01-01 00:00:00").getTime()), null);
+ PredicateLeaf pred2 = createPredicateLeaf
+ (PredicateLeaf.Operator.EQUALS, PredicateLeaf.Type.TIMESTAMP,
+ "x", new Timestamp(f.parse("2014-12-31 23:59:59").getTime()), null);
+ PredicateLeaf pred3 = createPredicateLeaf
+ (PredicateLeaf.Operator.EQUALS, PredicateLeaf.Type.TIMESTAMP,
+ "x", new Timestamp(f.parse("2016-01-01 00:00:01").getTime()), null);
+ OrcProto.ColumnStatistics cs = createTimestampStats("2015-01-01 00:00:00", "2016-01-01 00:00:00");
+
+ assertEquals(TruthValue.YES_NO_NULL,
+ evaluateTimestamp(cs, pred, true, true));
+ assertEquals(TruthValue.NO_NULL,
+ evaluateTimestamp(cs, pred2, true, true));
+ assertEquals(TruthValue.NO_NULL,
+ evaluateTimestamp(cs, pred3, true, true));
+
+ assertEquals(TruthValue.NO_NULL,
+ evaluateTimestampBloomfilter(cs, pred, new BloomFilterUtf8(10000, 0.01), OrcFile.WriterVersion.ORC_135, true));
+ assertEquals(TruthValue.NO_NULL,
+ evaluateTimestampBloomfilter(cs, pred2, new BloomFilterUtf8(10000, 0.01), OrcFile.WriterVersion.ORC_135, true));
+
+ BloomFilterUtf8 bf = new BloomFilterUtf8(10, 0.05);
+ bf.addLong(getUtcTimestamp("2015-06-01 00:00:00"));
+ assertEquals(TruthValue.NO_NULL,
+ evaluateTimestampBloomfilter(cs, pred, bf, OrcFile.WriterVersion.ORC_135, true));
+
+ bf.addLong(getUtcTimestamp("2015-01-01 00:00:00"));
+ assertEquals(TruthValue.YES_NO_NULL,
+ evaluateTimestampBloomfilter(cs, pred, bf, OrcFile.WriterVersion.ORC_135, true));
}
private static long getUtcTimestamp(String ts) {
@@ -1978,7 +2018,7 @@ public class TestRecordReaderImpl {
.end().build();
RecordReaderImpl.SargApplier applier =
new RecordReaderImpl.SargApplier(sarg, 1000, evolution,
- OrcFile.WriterVersion.ORC_135);
+ OrcFile.WriterVersion.ORC_135, false);
OrcProto.StripeInformation stripe =
OrcProto.StripeInformation.newBuilder().setNumberOfRows(4000).build();
OrcProto.RowIndex[] indexes = new OrcProto.RowIndex[3];
@@ -2026,7 +2066,7 @@ public class TestRecordReaderImpl {
.end().build();
RecordReaderImpl.SargApplier applier =
new RecordReaderImpl.SargApplier(sarg, 1000, evolution,
- OrcFile.WriterVersion.ORC_135);
+ OrcFile.WriterVersion.ORC_135, false);
OrcProto.StripeInformation stripe =
OrcProto.StripeInformation.newBuilder().setNumberOfRows(3000).build();
OrcProto.RowIndex[] indexes = new OrcProto.RowIndex[3];