You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/12/30 14:36:14 UTC

kylin git commit: KYLIN-1270 improve TimedJsonStreamParser to support week_start, month_start, quarter_start, year_start

Repository: kylin
Updated Branches:
  refs/heads/2.0-rc e0748f5b4 -> 5c83a14a4


KYLIN-1270 improve TimedJsonStreamParser to support week_start, month_start, quarter_start, year_start


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5c83a14a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5c83a14a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5c83a14a

Branch: refs/heads/2.0-rc
Commit: 5c83a14a4bea5e1496d49ed362593aaaf84922ed
Parents: e0748f5
Author: honma <ho...@ebay.com>
Authored: Wed Dec 30 21:33:37 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Dec 30 21:35:43 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/TimeUtil.java  | 44 +++++++++++++++--
 .../apache/kylin/common/util/TimeUtilTest.java  | 21 +++++++-
 .../source/kafka/TimedJsonStreamParser.java     | 51 ++++++++++++++------
 3 files changed, 94 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5c83a14a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
index c79e88b..17868a6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
@@ -1,12 +1,13 @@
 package org.apache.kylin.common.util;
 
+import java.util.Calendar;
+import java.util.TimeZone;
+
 /**
  */
 public class TimeUtil {
-    public enum NormalizedTimeUnit {
-        MINUTE, HOUR, DAY
-    }
 
+    private static TimeZone gmt = TimeZone.getTimeZone("GMT");
     private static long ONE_MINUTE_TS = 60 * 1000;
     private static long ONE_HOUR_TS = 60 * ONE_MINUTE_TS;
     private static long ONE_DAY_TS = 24 * ONE_HOUR_TS;
@@ -23,7 +24,40 @@ public class TimeUtil {
         return ts / ONE_DAY_TS * ONE_DAY_TS;
     }
 
-    public static long getNextPeriodStart(long ts, long period) {
-        return ((ts + period - 1) / period) * period;
+    public static long getWeekStart(long ts) {
+        Calendar calendar = Calendar.getInstance(gmt);
+        calendar.setTimeInMillis(getDayStart(ts));
+        calendar.add(Calendar.DAY_OF_WEEK, calendar.getFirstDayOfWeek() - calendar.get(Calendar.DAY_OF_WEEK));
+        return calendar.getTimeInMillis();
     }
+
+    public static long getMonthStart(long ts) {
+        Calendar calendar = Calendar.getInstance(gmt);
+        calendar.setTimeInMillis(ts);
+        int year = calendar.get(Calendar.YEAR);
+        int month = calendar.get(Calendar.MONTH);
+        calendar.clear();
+        calendar.set(year, month, 1);
+        return calendar.getTimeInMillis();
+    }
+
+    public static long getQuarterStart(long ts) {
+        Calendar calendar = Calendar.getInstance(gmt);
+        calendar.setTimeInMillis(ts);
+        int year = calendar.get(Calendar.YEAR);
+        int month = calendar.get(Calendar.MONTH);
+        calendar.clear();
+        calendar.set(year, month / 3 * 3, 1);
+        return calendar.getTimeInMillis();
+    }
+
+    public static long getYearStart(long ts) {
+        Calendar calendar = Calendar.getInstance(gmt);
+        calendar.setTimeInMillis(ts);
+        int year = calendar.get(Calendar.YEAR);
+        calendar.clear();
+        calendar.set(year, 0, 1);
+        return calendar.getTimeInMillis();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5c83a14a/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
index d81d49a..3fdf6aa 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
@@ -3,14 +3,19 @@ package org.apache.kylin.common.util;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
+import java.util.TimeZone;
 
-import org.apache.kylin.common.util.TimeUtil.NormalizedTimeUnit;
 import org.junit.Assert;
 import org.junit.Test;
 
 /**
  */
 public class TimeUtilTest {
+
+    public enum NormalizedTimeUnit {
+        MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, YEAR
+    }
+
     public static long normalizeTime(long timeMillis, NormalizedTimeUnit unit) {
         Calendar a = Calendar.getInstance();
         Calendar b = Calendar.getInstance();
@@ -28,6 +33,7 @@ public class TimeUtilTest {
     @Test
     public void basicTest() throws ParseException {
         java.text.DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+        dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
 
         long t1 = dateFormat.parse("2012/01/01 00:00:01").getTime();
         Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t1));
@@ -36,6 +42,19 @@ public class TimeUtilTest {
         long t2 = dateFormat.parse("2012/12/31 11:02:01").getTime();
         Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t2));
         Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t2));
+
+        long t3 = dateFormat.parse("2012/12/31 11:02:01").getTime();
+        Assert.assertEquals(dateFormat.parse("2012/12/1 00:00:00").getTime(), TimeUtil.getMonthStart(t3));
+        Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t3));
+        Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t3));
+        Assert.assertEquals(dateFormat.parse("2012/12/30 00:00:00").getTime(), TimeUtil.getWeekStart(t3));
+
+        long t4 = dateFormat.parse("2015/01/01 10:01:30").getTime();
+        Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getMonthStart(t4));
+        Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t4));
+        Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t4));
+        Assert.assertEquals(dateFormat.parse("2014/12/28 00:00:00").getTime(), TimeUtil.getWeekStart(t4));
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5c83a14a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 00f93a5..c1d8379 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -34,22 +34,28 @@
 
 package org.apache.kylin.source.kafka;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import kafka.message.MessageAndOffset;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.*;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.google.common.collect.Lists;
 
 /**
  * each json message with a "timestamp" field
@@ -108,25 +114,38 @@ public final class TimedJsonStreamParser extends StreamingParser {
             }
             ArrayList<String> result = Lists.newArrayList();
 
+            long normalized = 0;
             for (TblColRef column : allColumns) {
                 String columnName = column.getName();
                 if (columnName.equalsIgnoreCase("minute_start")) {
-                    long minuteStart = TimeUtil.getMinuteStart(t);
-                    result.add(formatTs ? DateFormat.formatToTimeStr(minuteStart) : String.valueOf(minuteStart));
+                    normalized = TimeUtil.getMinuteStart(t);
+                    result.add(formatTs ? DateFormat.formatToTimeStr(normalized) : String.valueOf(normalized));
                 } else if (columnName.equalsIgnoreCase("hour_start")) {
-                    long hourStart = TimeUtil.getHourStart(t);
-                    result.add(formatTs ? DateFormat.formatToTimeStr(hourStart) : String.valueOf(hourStart));
+                    normalized = TimeUtil.getHourStart(t);
+                    result.add(formatTs ? DateFormat.formatToTimeStr(normalized) : String.valueOf(normalized));
                 } else if (columnName.equalsIgnoreCase("day_start")) {
-                    //of day start we'll add yyyy-mm-dd
-                    long ts = TimeUtil.getDayStart(t);
-                    result.add(DateFormat.formatToDateStr(ts));
+                    //from day_start on, formatTs will output date format
+                    normalized = TimeUtil.getDayStart(t);
+                    result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized));
+                } else if (columnName.equalsIgnoreCase("week_start")) {
+                    normalized = TimeUtil.getWeekStart(t);
+                    result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized));
+                } else if (columnName.equalsIgnoreCase("month_start")) {
+                    normalized = TimeUtil.getMonthStart(t);
+                    result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized));
+                } else if (columnName.equalsIgnoreCase("quarter_start")) {
+                    normalized = TimeUtil.getQuarterStart(t);
+                    result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized));
+                } else if (columnName.equalsIgnoreCase("year_start")) {
+                    normalized = TimeUtil.getYearStart(t);
+                    result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized));
                 } else {
                     String x = root.get(columnName.toLowerCase());
                     result.add(x);
                 }
             }
 
-            return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap());
+            return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object> emptyMap());
 
         } catch (IOException e) {
             logger.error("error", e);