You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/12/30 14:36:14 UTC
kylin git commit: KYLIN-1270 improve TimedJsonStreamParser to support
week_start, month_start, quarter_start, year_start
Repository: kylin
Updated Branches:
refs/heads/2.0-rc e0748f5b4 -> 5c83a14a4
KYLIN-1270 improve TimedJsonStreamParser to support week_start, month_start, quarter_start, year_start
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5c83a14a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5c83a14a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5c83a14a
Branch: refs/heads/2.0-rc
Commit: 5c83a14a4bea5e1496d49ed362593aaaf84922ed
Parents: e0748f5
Author: honma <ho...@ebay.com>
Authored: Wed Dec 30 21:33:37 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Dec 30 21:35:43 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/TimeUtil.java | 44 +++++++++++++++--
.../apache/kylin/common/util/TimeUtilTest.java | 21 +++++++-
.../source/kafka/TimedJsonStreamParser.java | 51 ++++++++++++++------
3 files changed, 94 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/5c83a14a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
index c79e88b..17868a6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
@@ -1,12 +1,13 @@
package org.apache.kylin.common.util;
+import java.util.Calendar;
+import java.util.TimeZone;
+
/**
*/
public class TimeUtil {
- public enum NormalizedTimeUnit {
- MINUTE, HOUR, DAY
- }
+ private static TimeZone gmt = TimeZone.getTimeZone("GMT");
private static long ONE_MINUTE_TS = 60 * 1000;
private static long ONE_HOUR_TS = 60 * ONE_MINUTE_TS;
private static long ONE_DAY_TS = 24 * ONE_HOUR_TS;
@@ -23,7 +24,40 @@ public class TimeUtil {
return ts / ONE_DAY_TS * ONE_DAY_TS;
}
- public static long getNextPeriodStart(long ts, long period) {
- return ((ts + period - 1) / period) * period;
+ public static long getWeekStart(long ts) {
+ Calendar calendar = Calendar.getInstance(gmt);
+ calendar.setTimeInMillis(getDayStart(ts));
+ calendar.add(Calendar.DAY_OF_WEEK, calendar.getFirstDayOfWeek() - calendar.get(Calendar.DAY_OF_WEEK));
+ return calendar.getTimeInMillis();
}
+
+ public static long getMonthStart(long ts) {
+ Calendar calendar = Calendar.getInstance(gmt);
+ calendar.setTimeInMillis(ts);
+ int year = calendar.get(Calendar.YEAR);
+ int month = calendar.get(Calendar.MONTH);
+ calendar.clear();
+ calendar.set(year, month, 1);
+ return calendar.getTimeInMillis();
+ }
+
+ public static long getQuarterStart(long ts) {
+ Calendar calendar = Calendar.getInstance(gmt);
+ calendar.setTimeInMillis(ts);
+ int year = calendar.get(Calendar.YEAR);
+ int month = calendar.get(Calendar.MONTH);
+ calendar.clear();
+ calendar.set(year, month / 3 * 3, 1);
+ return calendar.getTimeInMillis();
+ }
+
+ public static long getYearStart(long ts) {
+ Calendar calendar = Calendar.getInstance(gmt);
+ calendar.setTimeInMillis(ts);
+ int year = calendar.get(Calendar.YEAR);
+ calendar.clear();
+ calendar.set(year, 0, 1);
+ return calendar.getTimeInMillis();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5c83a14a/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
index d81d49a..3fdf6aa 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java
@@ -3,14 +3,19 @@ package org.apache.kylin.common.util;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
+import java.util.TimeZone;
-import org.apache.kylin.common.util.TimeUtil.NormalizedTimeUnit;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class TimeUtilTest {
+
+ public enum NormalizedTimeUnit {
+ MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, YEAR
+ }
+
public static long normalizeTime(long timeMillis, NormalizedTimeUnit unit) {
Calendar a = Calendar.getInstance();
Calendar b = Calendar.getInstance();
@@ -28,6 +33,7 @@ public class TimeUtilTest {
@Test
public void basicTest() throws ParseException {
java.text.DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
long t1 = dateFormat.parse("2012/01/01 00:00:01").getTime();
Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t1));
@@ -36,6 +42,19 @@ public class TimeUtilTest {
long t2 = dateFormat.parse("2012/12/31 11:02:01").getTime();
Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t2));
Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t2));
+
+ long t3 = dateFormat.parse("2012/12/31 11:02:01").getTime();
+ Assert.assertEquals(dateFormat.parse("2012/12/1 00:00:00").getTime(), TimeUtil.getMonthStart(t3));
+ Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t3));
+ Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t3));
+ Assert.assertEquals(dateFormat.parse("2012/12/30 00:00:00").getTime(), TimeUtil.getWeekStart(t3));
+
+ long t4 = dateFormat.parse("2015/01/01 10:01:30").getTime();
+ Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getMonthStart(t4));
+ Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t4));
+ Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t4));
+ Assert.assertEquals(dateFormat.parse("2014/12/28 00:00:00").getTime(), TimeUtil.getWeekStart(t4));
+
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5c83a14a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 00f93a5..c1d8379 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -34,22 +34,28 @@
package org.apache.kylin.source.kafka;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import kafka.message.MessageAndOffset;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.*;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.google.common.collect.Lists;
/**
* each json message with a "timestamp" field
@@ -108,25 +114,38 @@ public final class TimedJsonStreamParser extends StreamingParser {
}
ArrayList<String> result = Lists.newArrayList();
+ long normalized = 0;
for (TblColRef column : allColumns) {
String columnName = column.getName();
if (columnName.equalsIgnoreCase("minute_start")) {
- long minuteStart = TimeUtil.getMinuteStart(t);
- result.add(formatTs ? DateFormat.formatToTimeStr(minuteStart) : String.valueOf(minuteStart));
+ normalized = TimeUtil.getMinuteStart(t);
+ result.add(formatTs ? DateFormat.formatToTimeStr(normalized) : String.valueOf(normalized));
} else if (columnName.equalsIgnoreCase("hour_start")) {
- long hourStart = TimeUtil.getHourStart(t);
- result.add(formatTs ? DateFormat.formatToTimeStr(hourStart) : String.valueOf(hourStart));
+ normalized = TimeUtil.getHourStart(t);
+ result.add(formatTs ? DateFormat.formatToTimeStr(normalized) : String.valueOf(normalized));
} else if (columnName.equalsIgnoreCase("day_start")) {
- //of day start we'll add yyyy-mm-dd
- long ts = TimeUtil.getDayStart(t);
- result.add(DateFormat.formatToDateStr(ts));
+ //from day_start on, formatTs will output date format
+ normalized = TimeUtil.getDayStart(t);
+ result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized));
+ } else if (columnName.equalsIgnoreCase("week_start")) {
+ normalized = TimeUtil.getWeekStart(t);
+ result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized));
+ } else if (columnName.equalsIgnoreCase("month_start")) {
+ normalized = TimeUtil.getMonthStart(t);
+ result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized));
+ } else if (columnName.equalsIgnoreCase("quarter_start")) {
+ normalized = TimeUtil.getQuarterStart(t);
+ result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized));
+ } else if (columnName.equalsIgnoreCase("year_start")) {
+ normalized = TimeUtil.getYearStart(t);
+ result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized));
} else {
String x = root.get(columnName.toLowerCase());
result.add(x);
}
}
- return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap());
+ return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object> emptyMap());
} catch (IOException e) {
logger.error("error", e);