You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/05 08:50:05 UTC

[kylin] branch master updated: fix bug: real-time stream don't support GMT-N timezone and other non-derived time columns

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 88205b9  fix bug: real-time stream don't support GMT-N timezone and other non-derived time columns
88205b9 is described below

commit 88205b94bf5fb7451e01dd95fc08b35aade04570
Author: liukun4515 <li...@apache.org>
AuthorDate: Sat Feb 6 20:00:28 2021 +0800

    fix bug: real-time stream don't support GMT-N timezone and other non-derived time columns
---
 .../apache/kylin/storage/gtrecord/CubeTupleConverter.java    | 12 +++++++-----
 .../kylin/stream/core/query/StreamingTupleConverter.java     | 12 ++++++++----
 .../kylin/stream/source/kafka/TimedJsonStreamParser.java     |  9 ++++++++-
 3 files changed, 23 insertions(+), 10 deletions(-)

diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 9911b51..5595f5a 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -71,7 +71,7 @@ public class CubeTupleConverter implements ITupleConverter {
     public final List<Integer> advMeasureIndexInGTValues;
     private List<ILookupTable> usedLookupTables;
 
-    final Set<Integer> timestampColumn = new HashSet<>();
+    final Set<Integer> needAdjustTimeColumns = new HashSet<>();
     String eventTimezone;
     boolean autoJustByTimezone;
     private final long timeZoneOffset;
@@ -113,9 +113,11 @@ public class CubeTupleConverter implements ITupleConverter {
         // pre-calculate dimension index mapping to tuple
         for (TblColRef dim : selectedDimensions) {
             tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
-            if (TimeDerivedColumnType.isTimeDerivedColumn(dim.getName())
-                    && !TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(dim.getName())) {
-                timestampColumn.add(tupleIdx[i]);
+            // all time columns should be adjusted using timezone offset except derived column above day,
+            // such as DAY_START, WEEK_STAR, YEAR_START.
+            if (dim.getType().isDateTimeFamily() &&
+                !TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(dim.getName())) {
+                needAdjustTimeColumns.add(tupleIdx[i]);
             }
             i++;
         }
@@ -175,7 +177,7 @@ public class CubeTupleConverter implements ITupleConverter {
             int ti = tupleIdx[i];
             if (ti >= 0) {
                 // add offset to return result according to timezone
-                if (autoJustByTimezone && timestampColumn.contains(ti)) {
+                if (autoJustByTimezone && needAdjustTimeColumns.contains(ti)) {
                     // For streaming
                     try {
                         String v = toString(gtValues[i]);
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
index a6a3da5..d820174 100755
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
@@ -51,7 +51,7 @@ public class StreamingTupleConverter {
     final int dimCnt;
     final int metricsCnt;
     final MeasureType<?>[] measureTypes;
-    final Set<Integer> timestampColumn = new HashSet<>();
+    final Set<Integer> needAdjustTimeColumns = new HashSet<>();
 
     final List<MeasureType.IAdvMeasureFiller> advMeasureFillers;
     final List<Integer> advMeasureIndexInGTValues;
@@ -76,8 +76,12 @@ public class StreamingTupleConverter {
         // pre-calculate dimension index mapping to tuple
         for (TblColRef dim : schema.getDimensions()) {
             dimTupleIdx[idx] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
-            if (dim.getType().isDateTimeFamily() && TimeDerivedColumnType.isTimeDerivedColumn(dim.getName()))
-                timestampColumn.add(dimTupleIdx[idx]);
+            // all time columns should be adjusted using timezone offset except derived column above day,
+            // such as DAY_START, WEEK_STAR, YEAR_START.
+            if (dim.getType().isDateTimeFamily() &&
+                !TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(dim.getName())) {
+                needAdjustTimeColumns.add(dimTupleIdx[idx]);
+            }
             idx++;
         }
 
@@ -109,7 +113,7 @@ public class StreamingTupleConverter {
         for (int i = 0; i < dimCnt; i++) {
             int ti = dimTupleIdx[i];
             if (ti >= 0) {
-                if (autoTimezone && timestampColumn.contains(ti)) {
+                if (autoTimezone && needAdjustTimeColumns.contains(ti)) {
                     try {
                         tuple.setDimensionValue(ti, Long.toString(Long.parseLong(dimValues[i]) + TIME_ZONE_OFFSET));
                     } catch (NumberFormatException nfe) {
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
index 4342715..1bf8046 100644
--- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
@@ -149,9 +149,16 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
                 String columnName = column.getName();
                 TimeDerivedColumnType columnType = TimeDerivedColumnType.getTimeDerivedColumnType(columnName);
                 if (columnType != null) {
-                    if (timeZoneOffset > 0 && TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(columnName)) {
+                    // For example: timezone is GMT+8
+                    // t = 1612506748660 and it represent 2021-02-05 14:32:28 GMT+8 or 2021-02-05 5:32:28 GMT-0.
+                    // The day_start of 2021-02-05 14:32:28 GMT+8 is 2021-02-05 00:00:00 GMT+8
+                    // t + timeZoneOffset represent 2021-02-05 14:32:28 GMT-0, and normalized result is 2021-02-05 00:00:00 GMT-0.
+                    // In the query convert, just convert the long to time string using GMT+0 timezone.
+                    if (TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(columnName)) {
                         result.add(String.valueOf(columnType.normalize(t + timeZoneOffset)));
                     } else {
+                        // Before conversion in query convert, the long should add timezone offset and then use GMT+0 to 
+                        // convert to time string.
                         result.add(String.valueOf(columnType.normalize(t)));
                     }
                 } else {