You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/05 08:50:05 UTC
[kylin] branch master updated: fix bug: real-time stream don't
support GMT-N timezone and other non-derived time columns
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 88205b9 fix bug: real-time stream don't support GMT-N timezone and other non-derived time columns
88205b9 is described below
commit 88205b94bf5fb7451e01dd95fc08b35aade04570
Author: liukun4515 <li...@apache.org>
AuthorDate: Sat Feb 6 20:00:28 2021 +0800
fix bug: real-time stream don't support GMT-N timezone and other non-derived time columns
---
.../apache/kylin/storage/gtrecord/CubeTupleConverter.java | 12 +++++++-----
.../kylin/stream/core/query/StreamingTupleConverter.java | 12 ++++++++----
.../kylin/stream/source/kafka/TimedJsonStreamParser.java | 9 ++++++++-
3 files changed, 23 insertions(+), 10 deletions(-)
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 9911b51..5595f5a 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -71,7 +71,7 @@ public class CubeTupleConverter implements ITupleConverter {
public final List<Integer> advMeasureIndexInGTValues;
private List<ILookupTable> usedLookupTables;
- final Set<Integer> timestampColumn = new HashSet<>();
+ final Set<Integer> needAdjustTimeColumns = new HashSet<>();
String eventTimezone;
boolean autoJustByTimezone;
private final long timeZoneOffset;
@@ -113,9 +113,11 @@ public class CubeTupleConverter implements ITupleConverter {
// pre-calculate dimension index mapping to tuple
for (TblColRef dim : selectedDimensions) {
tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
- if (TimeDerivedColumnType.isTimeDerivedColumn(dim.getName())
- && !TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(dim.getName())) {
- timestampColumn.add(tupleIdx[i]);
+ // all time columns should be adjusted using timezone offset except derived column above day,
+ // such as DAY_START, WEEK_STAR, YEAR_START.
+ if (dim.getType().isDateTimeFamily() &&
+ !TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(dim.getName())) {
+ needAdjustTimeColumns.add(tupleIdx[i]);
}
i++;
}
@@ -175,7 +177,7 @@ public class CubeTupleConverter implements ITupleConverter {
int ti = tupleIdx[i];
if (ti >= 0) {
// add offset to return result according to timezone
- if (autoJustByTimezone && timestampColumn.contains(ti)) {
+ if (autoJustByTimezone && needAdjustTimeColumns.contains(ti)) {
// For streaming
try {
String v = toString(gtValues[i]);
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
index a6a3da5..d820174 100755
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
@@ -51,7 +51,7 @@ public class StreamingTupleConverter {
final int dimCnt;
final int metricsCnt;
final MeasureType<?>[] measureTypes;
- final Set<Integer> timestampColumn = new HashSet<>();
+ final Set<Integer> needAdjustTimeColumns = new HashSet<>();
final List<MeasureType.IAdvMeasureFiller> advMeasureFillers;
final List<Integer> advMeasureIndexInGTValues;
@@ -76,8 +76,12 @@ public class StreamingTupleConverter {
// pre-calculate dimension index mapping to tuple
for (TblColRef dim : schema.getDimensions()) {
dimTupleIdx[idx] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
- if (dim.getType().isDateTimeFamily() && TimeDerivedColumnType.isTimeDerivedColumn(dim.getName()))
- timestampColumn.add(dimTupleIdx[idx]);
+ // all time columns should be adjusted using timezone offset except derived column above day,
+ // such as DAY_START, WEEK_STAR, YEAR_START.
+ if (dim.getType().isDateTimeFamily() &&
+ !TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(dim.getName())) {
+ needAdjustTimeColumns.add(dimTupleIdx[idx]);
+ }
idx++;
}
@@ -109,7 +113,7 @@ public class StreamingTupleConverter {
for (int i = 0; i < dimCnt; i++) {
int ti = dimTupleIdx[i];
if (ti >= 0) {
- if (autoTimezone && timestampColumn.contains(ti)) {
+ if (autoTimezone && needAdjustTimeColumns.contains(ti)) {
try {
tuple.setDimensionValue(ti, Long.toString(Long.parseLong(dimValues[i]) + TIME_ZONE_OFFSET));
} catch (NumberFormatException nfe) {
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
index 4342715..1bf8046 100644
--- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
@@ -149,9 +149,16 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
String columnName = column.getName();
TimeDerivedColumnType columnType = TimeDerivedColumnType.getTimeDerivedColumnType(columnName);
if (columnType != null) {
- if (timeZoneOffset > 0 && TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(columnName)) {
+ // For example: timezone is GMT+8
+ // t = 1612506748660 and it represent 2021-02-05 14:32:28 GMT+8 or 2021-02-05 5:32:28 GMT-0.
+ // The day_start of 2021-02-05 14:32:28 GMT+8 is 2021-02-05 00:00:00 GMT+8
+ // t + timeZoneOffset represent 2021-02-05 14:32:28 GMT-0, and normalized result is 2021-02-05 00:00:00 GMT-0.
+ // In the query convert, just convert the long to time string using GMT+0 timezone.
+ if (TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(columnName)) {
result.add(String.valueOf(columnType.normalize(t + timeZoneOffset)));
} else {
+ // Before conversion in query convert, the long should add timezone offset and then use GMT+0 to
+ // convert to time string.
result.add(String.valueOf(columnType.normalize(t)));
}
} else {