You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/04/12 13:53:11 UTC

[kylin] branch master updated: KYLIN-3788 Modify the time conversion time zone of the kafka streaming access

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new e13f101  KYLIN-3788  Modify the time conversion time zone of the kafka streaming access
e13f101 is described below

commit e13f101cb52c75a5ebf581f490b7a603d0ba4289
Author: zhaojintaozhao <49...@users.noreply.github.com>
AuthorDate: Fri Apr 12 21:53:06 2019 +0800

    KYLIN-3788  Modify the time conversion time zone of the kafka streaming access
    
    * change simpleformat timezone of   kafka streaming timestamp, such as  hour_start, day_start ...
    
    * Modify the time conversion logic of the  kafka data access; this fix solve the problem that the streaming task time is different from the realtime due to the time zone in kafka streaming.
    
    * KYLIN-3788 This commit Changes SimpleDateformat to FastDateFormat because SimpleDateFormat is forbidden API.
---
 .../org/apache/kylin/common/util/DateFormat.java   |  13 +++
 .../org/apache/kylin/common/util/TimeUtil.java     |  69 +++++++++++---
 .../org/apache/kylin/common/util/TimeUtilTest.java | 104 +++++++++++++++++++--
 .../apache/kylin/source/kafka/StreamingParser.java |  29 +++---
 4 files changed, 184 insertions(+), 31 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
index 5035e03..e7a5579 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -96,6 +96,19 @@ public class DateFormat {
         return date;
     }
 
+    public static String formatToTimeStrWithTimeZone(TimeZone timeZone, long mills){
+        return formatToStrWithTimeZone(timeZone, mills, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+    }
+
+    public static String formatToDateStrWithTimeZone(TimeZone timeZone, long mills){
+        return formatToStrWithTimeZone(timeZone, mills, DEFAULT_DATE_PATTERN);
+    }
+
+    private static String formatToStrWithTimeZone(TimeZone timeZone, long mills, String pattern){
+        FastDateFormat dateFormat =  FastDateFormat.getInstance(pattern, timeZone);
+        return dateFormat.format(new Date(mills));
+    }
+
     public static long stringToMillis(String str) {
         // try to be smart and guess the date format
         if (isAllDigits(str)) {
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
index e92d149..6eb324b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
@@ -44,18 +44,37 @@ public class TimeUtil {
     }
 
     public static long getDayStart(long ts) {
-        return ts / ONE_DAY_TS * ONE_DAY_TS;
+        return getDayStartWithTimeZone(gmt, ts);
+    }
+
+    public static long getDayStartWithTimeZone(TimeZone timeZone, long ts){
+        Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
+        calendar.setTimeInMillis(ts);
+        int year = calendar.get(Calendar.YEAR);
+        int month = calendar.get(Calendar.MONTH);
+        int date = calendar.get(Calendar.DATE);
+        calendar.clear();
+        calendar.set(year, month, date);
+        return calendar.getTimeInMillis();
     }
 
     public static long getWeekStart(long ts) {
-        Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
-        calendar.setTimeInMillis(getDayStart(ts));
+        return getWeekStartWithTimeZone(gmt, ts);
+    }
+
+    public static long getWeekStartWithTimeZone(TimeZone timeZone, long ts){
+        Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
+        calendar.setTimeInMillis(getDayStartWithTimeZone(timeZone, ts));
         calendar.add(Calendar.DAY_OF_WEEK, calendar.getFirstDayOfWeek() - calendar.get(Calendar.DAY_OF_WEEK));
         return calendar.getTimeInMillis();
     }
 
     public static long getMonthStart(long ts) {
-        Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
+        return getMonthStartWithTimeZone(gmt, ts);
+    }
+
+    public static long getMonthStartWithTimeZone(TimeZone timeZone, long ts){
+        Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
         calendar.setTimeInMillis(ts);
         int year = calendar.get(Calendar.YEAR);
         int month = calendar.get(Calendar.MONTH);
@@ -65,7 +84,11 @@ public class TimeUtil {
     }
 
     public static long getQuarterStart(long ts) {
-        Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
+        return getQuarterStartWithTimeZone(gmt, ts);
+    }
+
+    public static long getQuarterStartWithTimeZone(TimeZone timeZone, long ts) {
+        Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
         calendar.setTimeInMillis(ts);
         int year = calendar.get(Calendar.YEAR);
         int month = calendar.get(Calendar.MONTH);
@@ -75,7 +98,11 @@ public class TimeUtil {
     }
 
     public static long getYearStart(long ts) {
-        Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
+        return getYearStartWithTimeZone(gmt, ts);
+    }
+
+    public static long getYearStartWithTimeZone(TimeZone timeZone, long ts) {
+        Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
         calendar.setTimeInMillis(ts);
         int year = calendar.get(Calendar.YEAR);
         calendar.clear();
@@ -84,14 +111,22 @@ public class TimeUtil {
     }
 
     public static long getWeekEnd(long ts) {
-        Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
-        calendar.setTimeInMillis(getWeekStart(ts));
+        return getWeekEndWithTimeZone(gmt, ts);
+    }
+
+    public static long getWeekEndWithTimeZone(TimeZone timeZone, long ts) {
+        Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
+        calendar.setTimeInMillis(getWeekStartWithTimeZone(timeZone, ts));
         calendar.add(Calendar.DAY_OF_WEEK, 7);
         return calendar.getTimeInMillis();
     }
 
     public static long getMonthEnd(long ts) {
-        Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
+        return getMonthEndWithTimeZone(gmt, ts);
+    }
+
+    public static long getMonthEndWithTimeZone(TimeZone timeZone, long ts) {
+        Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
         calendar.setTimeInMillis(ts);
         calendar.set(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONDAY), calendar.get(Calendar.DAY_OF_MONTH), 0, 0, 0);
         calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMaximum(Calendar.DAY_OF_MONTH));
@@ -100,15 +135,23 @@ public class TimeUtil {
     }
 
     public static long getQuarterEnd(long ts) {
-        Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
-        calendar.setTimeInMillis(getQuarterStart(ts));
+        return getQuarterEndWithTimeZone(gmt, ts);
+    }
+
+    public static long getQuarterEndWithTimeZone(TimeZone timeZone, long ts) {
+        Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
+        calendar.setTimeInMillis(getQuarterStartWithTimeZone(timeZone, ts));
         calendar.add(Calendar.MONTH, 3);
         return calendar.getTimeInMillis();
     }
 
     public static long getYearEnd(long ts) {
-        Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
-        calendar.setTimeInMillis(getYearStart(ts));
+        return getYearEndWithTimeZone(gmt, ts);
+    }
+
+    public static long getYearEndWithTimeZone(TimeZone timeZone, long ts) {
+        Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
+        calendar.setTimeInMillis(getYearStartWithTimeZone(timeZone, ts));
         calendar.add(Calendar.YEAR, 1);
         return calendar.getTimeInMillis();
     }
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
index ced7125..7f7ead1 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
@@ -63,17 +63,109 @@ public class TimeUtilTest {
         Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t2));
 
         long t3 = dateFormat.parse("2012/12/31 11:02:01").getTime();
+        Assert.assertEquals(dateFormat.parse("2012/12/31 00:00:00").getTime(), TimeUtil.getDayStart(t3));
+        Assert.assertEquals(dateFormat.parse("2012/12/30 00:00:00").getTime(), TimeUtil.getWeekStart(t3));
         Assert.assertEquals(dateFormat.parse("2012/12/1 00:00:00").getTime(), TimeUtil.getMonthStart(t3));
         Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t3));
         Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t3));
-        Assert.assertEquals(dateFormat.parse("2012/12/30 00:00:00").getTime(), TimeUtil.getWeekStart(t3));
+        Assert.assertEquals(dateFormat.parse("2013/1/6 00:00:00").getTime(), TimeUtil.getWeekEnd(t3));
+        Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getMonthEnd(t3));
+        Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getQuarterEnd(t3));
+        Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getYearEnd(t3));
+
 
-        long t4 = dateFormat.parse("2015/01/01 10:01:30").getTime();
-        Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getMonthStart(t4));
-        Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t4));
-        Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t4));
-        Assert.assertEquals(dateFormat.parse("2014/12/28 00:00:00").getTime(), TimeUtil.getWeekStart(t4));
+        long t4 = dateFormat.parse("2012/10/29 11:02:01").getTime();
+        Assert.assertEquals(dateFormat.parse("2012/10/29 00:00:00").getTime(), TimeUtil.getDayStart(t4));
+        Assert.assertEquals(dateFormat.parse("2012/10/28 00:00:00").getTime(), TimeUtil.getWeekStart(t4));
+        Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getMonthStart(t4));
+        Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t4));
+        Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t4));
+        Assert.assertEquals(dateFormat.parse("2012/11/4 00:00:00").getTime(), TimeUtil.getWeekEnd(t4));
+        Assert.assertEquals(dateFormat.parse("2012/11/1 00:00:00").getTime(), TimeUtil.getMonthEnd(t4));
+        Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getQuarterEnd(t4));
+        Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getYearEnd(t4));
 
+        long t5 = dateFormat.parse("2012/8/29 23:12:41").getTime();
+        Assert.assertEquals(dateFormat.parse("2012/8/29 00:00:00").getTime(), TimeUtil.getDayStart(t5));
+        Assert.assertEquals(dateFormat.parse("2012/8/26 00:00:00").getTime(), TimeUtil.getWeekStart(t5));
+        Assert.assertEquals(dateFormat.parse("2012/8/1 00:00:00").getTime(), TimeUtil.getMonthStart(t5));
+        Assert.assertEquals(dateFormat.parse("2012/7/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t5));
+        Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t5));
+        Assert.assertEquals(dateFormat.parse("2012/9/2 00:00:00").getTime(), TimeUtil.getWeekEnd(t5));
+        Assert.assertEquals(dateFormat.parse("2012/9/1 00:00:00").getTime(), TimeUtil.getMonthEnd(t5));
+        Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterEnd(t5));
+        Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getYearEnd(t5));
+
+        long t6 = dateFormat.parse("2015/01/01 10:01:30").getTime();
+        Assert.assertEquals(dateFormat.parse("2015/01/01 00:00:00").getTime(), TimeUtil.getDayStart(t6));
+        Assert.assertEquals(dateFormat.parse("2014/12/28 00:00:00").getTime(), TimeUtil.getWeekStart(t6));
+        Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getMonthStart(t6));
+        Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t6));
+        Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t6));
+        Assert.assertEquals(dateFormat.parse("2015/1/4 00:00:00").getTime(), TimeUtil.getWeekEnd(t6));
+        Assert.assertEquals(dateFormat.parse("2015/2/1 00:00:00").getTime(), TimeUtil.getMonthEnd(t6));
+        Assert.assertEquals(dateFormat.parse("2015/4/1 00:00:00").getTime(), TimeUtil.getQuarterEnd(t6));
+        Assert.assertEquals(dateFormat.parse("2016/1/1 00:00:00").getTime(), TimeUtil.getYearEnd(t6));
     }
 
+
+    @Test
+    public void basicTestWithTimeZone() throws ParseException {
+        java.text.DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss", Locale.ROOT);
+        TimeZone timeZone = TimeZone.getTimeZone("GMT+8");
+        dateFormat.setTimeZone(timeZone);
+
+        long t1 = dateFormat.parse("2012/01/01 00:00:01").getTime();
+        Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t1));
+        Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t1));
+
+        long t2 = dateFormat.parse("2012/12/31 11:02:01").getTime();
+        Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t2));
+        Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t2));
+
+        long t3 = dateFormat.parse("2012/12/31 11:02:01").getTime();
+        Assert.assertEquals(dateFormat.parse("2012/12/31 00:00:00").getTime(), TimeUtil.getDayStartWithTimeZone(timeZone, t3));
+        Assert.assertEquals(dateFormat.parse("2012/12/30 00:00:00").getTime(), TimeUtil.getWeekStartWithTimeZone(timeZone, t3));
+        Assert.assertEquals(dateFormat.parse("2012/12/1 00:00:00").getTime(), TimeUtil.getMonthStartWithTimeZone(timeZone, t3));
+        Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterStartWithTimeZone(timeZone, t3));
+        Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStartWithTimeZone(timeZone, t3));
+        Assert.assertEquals(dateFormat.parse("2013/1/6 00:00:00").getTime(), TimeUtil.getWeekEndWithTimeZone(timeZone, t3));
+        Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getMonthEndWithTimeZone(timeZone, t3));
+        Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getQuarterEndWithTimeZone(timeZone, t3));
+        Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getYearEndWithTimeZone(timeZone, t3));
+
+
+        long t4 = dateFormat.parse("2012/10/29 11:02:01").getTime();
+        Assert.assertEquals(dateFormat.parse("2012/10/29 00:00:00").getTime(), TimeUtil.getDayStartWithTimeZone(timeZone, t4));
+        Assert.assertEquals(dateFormat.parse("2012/10/28 00:00:00").getTime(), TimeUtil.getWeekStartWithTimeZone(timeZone, t4));
+        Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getMonthStartWithTimeZone(timeZone, t4));
+        Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterStartWithTimeZone(timeZone, t4));
+        Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStartWithTimeZone(timeZone, t4));
+        Assert.assertEquals(dateFormat.parse("2012/11/4 00:00:00").getTime(), TimeUtil.getWeekEndWithTimeZone(timeZone, t4));
+        Assert.assertEquals(dateFormat.parse("2012/11/1 00:00:00").getTime(), TimeUtil.getMonthEndWithTimeZone(timeZone, t4));
+        Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getQuarterEndWithTimeZone(timeZone, t4));
+        Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getYearEndWithTimeZone(timeZone, t4));
+
+        long t5 = dateFormat.parse("2012/8/29 23:12:41").getTime();
+        Assert.assertEquals(dateFormat.parse("2012/8/29 00:00:00").getTime(), TimeUtil.getDayStartWithTimeZone(timeZone, t5));
+        Assert.assertEquals(dateFormat.parse("2012/8/26 00:00:00").getTime(), TimeUtil.getWeekStartWithTimeZone(timeZone, t5));
+        Assert.assertEquals(dateFormat.parse("2012/8/1 00:00:00").getTime(), TimeUtil.getMonthStartWithTimeZone(timeZone, t5));
+        Assert.assertEquals(dateFormat.parse("2012/7/1 00:00:00").getTime(), TimeUtil.getQuarterStartWithTimeZone(timeZone, t5));
+        Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStartWithTimeZone(timeZone, t5));
+        Assert.assertEquals(dateFormat.parse("2012/9/2 00:00:00").getTime(), TimeUtil.getWeekEndWithTimeZone(timeZone, t5));
+        Assert.assertEquals(dateFormat.parse("2012/9/1 00:00:00").getTime(), TimeUtil.getMonthEndWithTimeZone(timeZone, t5));
+        Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterEndWithTimeZone(timeZone, t5));
+        Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getYearEndWithTimeZone(timeZone, t5));
+
+        long t6 = dateFormat.parse("2015/01/01 10:01:30").getTime();
+        Assert.assertEquals(dateFormat.parse("2015/01/01 00:00:00").getTime(), TimeUtil.getDayStartWithTimeZone(timeZone, t6));
+        Assert.assertEquals(dateFormat.parse("2014/12/28 00:00:00").getTime(), TimeUtil.getWeekStartWithTimeZone(timeZone, t6));
+        Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getMonthStartWithTimeZone(timeZone, t6));
+        Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getQuarterStartWithTimeZone(timeZone, t6));
+        Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getYearStartWithTimeZone(timeZone, t6));
+        Assert.assertEquals(dateFormat.parse("2015/1/4 00:00:00").getTime(), TimeUtil.getWeekEndWithTimeZone(timeZone, t6));
+        Assert.assertEquals(dateFormat.parse("2015/2/1 00:00:00").getTime(), TimeUtil.getMonthEndWithTimeZone(timeZone, t6));
+        Assert.assertEquals(dateFormat.parse("2015/4/1 00:00:00").getTime(), TimeUtil.getQuarterEndWithTimeZone(timeZone, t6));
+        Assert.assertEquals(dateFormat.parse("2016/1/1 00:00:00").getTime(), TimeUtil.getYearEndWithTimeZone(timeZone, t6));
+    }
 }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index c97db36..041f240 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -25,6 +25,9 @@ import java.util.Map;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import java.nio.ByteBuffer;
+import java.util.TimeZone;
+
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.StreamingMessageRow;
 import org.apache.kylin.common.util.TimeUtil;
@@ -108,6 +111,8 @@ public abstract class StreamingParser {
      * @return true if the columnName is a derived time column; otherwise false;
      */
     public static final boolean populateDerivedTimeColumns(String columnName, List<String> result, long t) {
+        String timeZoneStr = KylinConfig.getInstanceFromEnv().getTimeZone();
+        TimeZone timeZone = TimeZone.getTimeZone(timeZoneStr);
 
         Integer derivedTimeColumn = derivedTimeColumns.get(columnName);
         if (derivedTimeColumn == null) {
@@ -118,31 +123,31 @@ public abstract class StreamingParser {
         switch (derivedTimeColumn) {
         case 1:
             normalized = TimeUtil.getMinuteStart(t);
-            result.add(DateFormat.formatToTimeWithoutMilliStr(normalized));
+            result.add(DateFormat.formatToTimeStrWithTimeZone(timeZone, normalized));
             break;
         case 2:
             normalized = TimeUtil.getHourStart(t);
-            result.add(DateFormat.formatToTimeWithoutMilliStr(normalized));
+            result.add(DateFormat.formatToTimeStrWithTimeZone(timeZone, normalized));
             break;
         case 3:
-            normalized = TimeUtil.getDayStart(t);
-            result.add(DateFormat.formatToDateStr(normalized));
+            normalized = TimeUtil.getDayStartWithTimeZone(timeZone, t);
+            result.add(DateFormat.formatToDateStrWithTimeZone(timeZone, normalized));
             break;
         case 4:
-            normalized = TimeUtil.getWeekStart(t);
-            result.add(DateFormat.formatToDateStr(normalized));
+            normalized = TimeUtil.getWeekStartWithTimeZone(timeZone, t);
+            result.add(DateFormat.formatToDateStrWithTimeZone(timeZone, normalized));
             break;
         case 5:
-            normalized = TimeUtil.getMonthStart(t);
-            result.add(DateFormat.formatToDateStr(normalized));
+            normalized = TimeUtil.getMonthStartWithTimeZone(timeZone, t);
+            result.add(DateFormat.formatToDateStrWithTimeZone(timeZone, normalized));
             break;
         case 6:
-            normalized = TimeUtil.getQuarterStart(t);
-            result.add(DateFormat.formatToDateStr(normalized));
+            normalized = TimeUtil.getQuarterStartWithTimeZone(timeZone, t);
+            result.add(DateFormat.formatToDateStrWithTimeZone(timeZone, normalized));
             break;
         case 7:
-            normalized = TimeUtil.getYearStart(t);
-            result.add(DateFormat.formatToDateStr(normalized));
+            normalized = TimeUtil.getYearStartWithTimeZone(timeZone, t);
+            result.add(DateFormat.formatToDateStrWithTimeZone(timeZone, normalized));
             break;
         default:
             throw new IllegalStateException();