You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/06 12:36:38 UTC

[kylin] branch master updated: KYLIN-4167 Phase2 (#961)

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new f2bea28  KYLIN-4167 Phase2 (#961)
f2bea28 is described below

commit f2bea280d12061b2fb5f39f5b301579178d70e7a
Author: Xiaoxiang Yu <hi...@126.com>
AuthorDate: Fri Dec 6 20:36:30 2019 +0800

    KYLIN-4167 Phase2 (#961)
    
    * KYLIN-4167 Phase2
    
    - Fix bugs when cubing job been dropped
    - Drop former segment info when submit new one
    - Add new logfile for streaming coordinator more log to let diagnostic easier.
    
    * KYLIN-4010 Auto ajust timezone
    
    * KYLIN-4010 Fix segment purge bugs
---
 build/conf/kylin-server-log4j.properties           |  24 ++++-
 .../org/apache/kylin/common/KylinConfigBase.java   |  10 +-
 .../org/apache/kylin/common/util/DateFormat.java   |  14 ++-
 .../org/apache/kylin/common/util/TimeUtil.java     |   6 +-
 .../src/main/resources/kylin-defaults.properties   |  35 ++++++-
 .../kylin/dimension/TimeDerivedColumnType.java     |  18 ++++
 .../kylin/storage/gtrecord/CubeTupleConverter.java |  17 +--
 .../apache/kylin/query/relnode/OLAPFilterRel.java  |   2 +-
 .../query/relnode/visitor/TupleFilterVisitor.java  |  13 ++-
 .../rest/controller/StreamingV2Controller.java     |  11 +-
 .../kylin/storage/stream/StreamStorageQuery.java   |   2 +-
 .../stream/coordinator/StreamMetadataStore.java    |  23 ++++
 .../coordinator/ZookeeperStreamMetadataStore.java  | 116 ++++++++++++++++++++-
 .../coordinator/assign/AssignmentsCache.java       |  66 ++++++++----
 .../stream/coordinator/assign/DefaultAssigner.java |   9 ++
 .../coordinator/coordinate/BuildJobSubmitter.java  |  93 +++++++++++------
 .../coordinate/ReceiverClusterManager.java         |  35 ++++---
 .../coordinate/StreamingCoordinator.java           |  14 +--
 .../coordinator/doctor/ClusterStateChecker.java    |  64 ++++++++++--
 .../coordinate/BuildJobSubmitterTest.java          |   7 +-
 .../coordinator/coordinate/StreamingTestBase.java  |   1 -
 .../stream/core/client/ReceiverAdminClient.java    |   5 +-
 .../core/consumer/StreamingConsumerChannel.java    |   1 +
 .../core/query/StreamingDataQueryPlanner.java      |  15 ++-
 .../stream/core/query/StreamingTupleConverter.java |   4 +-
 .../core/source/IStreamingMessageParser.java       |   3 +
 .../storage/columnar/FragmentFileSearcher.java     |  23 ++--
 .../core/util/CompareFilterTimeRangeChecker.java   |  17 ++-
 .../stream/source/kafka/TimedJsonStreamParser.java |  11 +-
 29 files changed, 525 insertions(+), 134 deletions(-)

diff --git a/build/conf/kylin-server-log4j.properties b/build/conf/kylin-server-log4j.properties
index e4d204f..bcaea65 100644
--- a/build/conf/kylin-server-log4j.properties
+++ b/build/conf/kylin-server-log4j.properties
@@ -25,8 +25,24 @@ log4j.appender.file.Append=true
 log4j.appender.file.MaxFileSize=268435456
 log4j.appender.file.MaxBackupIndex=10
 
+log4j.appender.realtime=org.apache.log4j.RollingFileAppender
+log4j.appender.realtime.layout=org.apache.log4j.PatternLayout
+log4j.appender.realtime.File=${catalina.home}/../logs/streaming_coordinator.log
+log4j.appender.realtime.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
+log4j.appender.realtime.Append=true
+log4j.appender.realtime.MaxFileSize=268435456
+log4j.appender.realtime.MaxBackupIndex=10
+
 #overall config
-log4j.rootLogger=INFO,file
-log4j.logger.org.apache.kylin=DEBUG
-log4j.logger.org.springframework=WARN
-log4j.logger.org.springframework.security=INFO
\ No newline at end of file
+log4j.rootLogger=INFO
+log4j.logger.org.apache.kylin=DEBUG,file
+log4j.logger.org.springframework=WARN,file
+log4j.logger.org.springframework.security=INFO,file
+
+log4j.additivity.logger.org.apache.kylin.stream=false
+log4j.logger.org.apache.kylin.stream=TRACE,realtime
+log4j.logger.org.apache.kylin.job=DEBUG,realtime
+log4j.logger.org.apache.kylin.rest.service.StreamingCoordinatorService=DEBUG,realtime
+log4j.logger.org.apache.kylin.rest.service.StreamingV2Service=DEBUG,realtime
+log4j.logger.org.apache.kylin.rest.controller.StreamingCoordinatorController=DEBUG,realtime
+log4j.logger.org.apache.kylin.rest.controller.StreamingV2Controller=DEBUG,realtime
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index bbffcc1..d44d944 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2116,7 +2116,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // streaming
+    // Realtime streaming
     // ============================================================================
     public String getStreamingStoreClass() {
         return getOptional("kylin.stream.store.class",
@@ -2281,8 +2281,12 @@ public abstract class KylinConfigBase implements Serializable {
     /**
      * whether realtime query should add timezone offset by kylin's web-timezone, please refer to KYLIN-4010 for detail
      */
-    public boolean isStreamingAutoJustTimezone() {
-        return Boolean.parseBoolean(getOptional("kylin.stream.auto.just.by.timezone", "false"));
+    public String getStreamingDerivedTimeTimezone() {
+        return (getOptional("kylin.stream.event.timezone", ""));
+    }
+
+    public boolean isAutoResubmitDiscardJob(){
+        return Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled", "true"));
     }
 
     // ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
index 1c6b551..2f51cbb 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -32,6 +32,7 @@ public class DateFormat {
     public static final String DEFAULT_TIME_PATTERN = "HH:mm:ss";
     public static final String DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS = "yyyy-MM-dd HH:mm:ss";
     public static final String DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS = "yyyy-MM-dd HH:mm:ss.SSS";
+    public static final String DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS_OFFSET = "yyyy-MM-dd HH:mm:ss.SSSZZ";
     public static final String YYYY_MM_DD_HH_MM = "yyyy-MM-dd HH:mm";
     public static final String YYYY_MM_DD_HH = "yyyy-MM-dd HH";
     public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
@@ -39,11 +40,12 @@ public class DateFormat {
     public static final String YYYYMMDDHH = "yyyyMMddHH";
     public static final String ISO_8601_24H_FULL_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZZ";
 
-    public static final String[] SUPPORTED_DATETIME_PATTERN = { //
-            DEFAULT_DATE_PATTERN, //
-            DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS, //
-            DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS, //
-            COMPACT_DATE_PATTERN, //
+    public static final String[] SUPPORTED_DATETIME_PATTERN = {
+            DEFAULT_DATE_PATTERN,
+            DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS,
+            DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS,
+            DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS_OFFSET,
+            COMPACT_DATE_PATTERN,
             ISO_8601_24H_FULL_FORMAT,
             YYYY_MM_DD_HH_MM,
             YYYY_MM_DD_HH,
@@ -143,6 +145,8 @@ public class DateFormat {
         } else if (str.length() > 19) {
             if (str.contains("T")) {
                 return stringToDate(str, ISO_8601_24H_FULL_FORMAT).getTime();
+            } else if (str.contains("+")) {
+                return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS_OFFSET).getTime();
             } else {
                 return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS).getTime();
             }
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
index 6eb324b..cd53cc6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
@@ -31,9 +31,9 @@ public class TimeUtil {
     }
 
     private static TimeZone gmt = TimeZone.getTimeZone("GMT");
-    public static long ONE_MINUTE_TS = 60 * 1000L;
-    public static long ONE_HOUR_TS = 60 * ONE_MINUTE_TS;
-    public static long ONE_DAY_TS = 24 * ONE_HOUR_TS;
+    public static final long ONE_MINUTE_TS = 60 * 1000L;
+    public static final long ONE_HOUR_TS = 60 * ONE_MINUTE_TS;
+    public static final long ONE_DAY_TS = 24 * ONE_HOUR_TS;
 
     public static long getMinuteStart(long ts) {
         return ts / ONE_MINUTE_TS * ONE_MINUTE_TS;
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 10ca8e6..cc6ee38 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -368,4 +368,37 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
 #kylin.engine.livy-conf.livy-enabled=false
 #kylin.engine.livy-conf.livy-url=http://LivyHost:8998
 #kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar
-#kylin.engine.livy-conf.livy-arr.jars=hdfs:///path-to-hadoop-dependency-jar
\ No newline at end of file
+#kylin.engine.livy-conf.livy-arr.jars=hdfs:///path-to-hadoop-dependency-jar
+
+
+### Realtime OLAP ###
+
+# Where should local segment cache located, for absolute path, the real path will be ${KYLIN_HOME}/${kylin.stream.index.path}
+kylin.stream.index.path=stream_index
+
+# The timezone for Derived Time Column like hour_start, try set to GMT+N, please check detail at KYLIN-4010
+kylin.stream.event.timezone=
+
+# Debug switch for print realtime global dict encode information, please check detail at KYLIN-4141
+kylin.stream.print-realtime-dict-enabled=false
+
+# Should enable latest coordinator, please check detail at KYLIN-4167
+kylin.stream.new.coordinator-enabled=true
+
+# In which way should we collect receiver's metrics info
+#kylin.stream.metrics.option=console/csv/jmx
+
+# When enable a streaming cube, should cousme from earliest offset or least offset
+kylin.stream.consume.offsets.latest=true
+
+# The parallelism of scan in receiver side
+kylin.stream.receiver.use-threads-per-query=8
+
+# How coordinator/receiver register itself into StreamMetadata, there are three option:
+# 1. hostname:port, then kylin will set the config ip and port as the currentNode;
+# 2. port, then kylin will get the node's hostname and append port as the currentNode;
+# 3. not set, then kylin will get the node hostname address and set the hostname and defaultPort(7070 for coordinator or 9090 for receiver) as the currentNode.
+#kylin.stream.node=
+
+# Auto resubmit after job be discarded
+kylin.stream.auto-resubmit-after-discard-enabled=true
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java b/core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java
index 01953e7..43cff4e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java
@@ -28,6 +28,8 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.TimeUtil;
 
 import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public enum TimeDerivedColumnType {
     MINUTE_START("minute_start") {
@@ -157,6 +159,7 @@ public enum TimeDerivedColumnType {
     private static final String QUARTER_START_NAME = "quarter_start";
     private static final String YEAR_START_NAME = "year_start";
     private static Map<String, TimeDerivedColumnType> nameColumnsMap = Maps.newHashMap();
+    private static Logger logger = LoggerFactory.getLogger(TimeDerivedColumnType.class);
 
     static {
         nameColumnsMap.put(MINUTE_START_NAME, MINUTE_START);
@@ -178,6 +181,14 @@ public enum TimeDerivedColumnType {
         return nameColumnsMap.containsKey(columnName.toLowerCase(Locale.ROOT));
     }
 
+    public static boolean isTimeDerivedColumnAboveDayLevel(String columnName) {
+        if (!isTimeDerivedColumn(columnName))
+            return false;
+        else {
+            return !columnName.equalsIgnoreCase(MINUTE_START_NAME) && !columnName.equalsIgnoreCase(HOUR_START_NAME);
+        }
+    }
+
     public static TimeDerivedColumnType getTimeDerivedColumnType(String columnName) {
         return nameColumnsMap.get(columnName.toLowerCase(Locale.ROOT));
     }
@@ -211,6 +222,13 @@ public enum TimeDerivedColumnType {
         return calculateTimeUnitRange(time);
     }
 
+    public Pair<Long, Long> getTimeUnitRangeTimezoneAware(Object timeValue, long timezoneOffset){
+        long ts = parseTimeValue(timeValue);
+        Pair<Long, Long> res = calculateTimeUnitRange(ts);
+        res = new Pair<>(res.getFirst() - timezoneOffset, res.getSecond() - timezoneOffset);
+        return res;
+    }
+
     abstract public Pair<Long, Long> calculateTimeUnitRange(long time);
 
     abstract public String normalizeTimeFormat(long time);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 94734b3..8e2a795 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -69,9 +69,9 @@ public class CubeTupleConverter implements ITupleConverter {
     private List<ILookupTable> usedLookupTables;
 
     final Set<Integer> timestampColumn = new HashSet<>();
+    String eventTimezone;
     boolean autoJustByTimezone;
-    private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone())
-            .getRawOffset();
+    private final long timeZoneOffset;
 
     public final int nSelectedDims;
 
@@ -92,11 +92,14 @@ public class CubeTupleConverter implements ITupleConverter {
         advMeasureFillers = Lists.newArrayListWithCapacity(1);
         advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
         usedLookupTables = Lists.newArrayList();
-        autoJustByTimezone = cubeSeg.getConfig().isStreamingAutoJustTimezone();
-        autoJustByTimezone = autoJustByTimezone
+        eventTimezone = cubeSeg.getConfig().getStreamingDerivedTimeTimezone();
+        autoJustByTimezone = eventTimezone.length() > 0
                 && cubeSeg.getCubeDesc().getModel().getRootFactTable().getTableDesc().isStreamingTable();
         if (autoJustByTimezone) {
             logger.debug("Will ajust dimsension for Time Derived Column.");
+            timeZoneOffset = TimeZone.getTimeZone(eventTimezone).getRawOffset();
+        } else {
+            timeZoneOffset = 0;
         }
         ////////////
 
@@ -105,8 +108,10 @@ public class CubeTupleConverter implements ITupleConverter {
         // pre-calculate dimension index mapping to tuple
         for (TblColRef dim : selectedDimensions) {
             tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
-            if (dim.getType().isDateTimeFamily() && TimeDerivedColumnType.isTimeDerivedColumn(dim.getName()))
+            if (TimeDerivedColumnType.isTimeDerivedColumn(dim.getName())
+                    && !TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(dim.getName())) {
                 timestampColumn.add(tupleIdx[i]);
+            }
             i++;
         }
 
@@ -167,7 +172,7 @@ public class CubeTupleConverter implements ITupleConverter {
                     try {
                         String v = toString(gtValues[i]);
                         if (v != null) {
-                            tuple.setDimensionValue(ti, Long.toString(Long.parseLong(v) + TIME_ZONE_OFFSET));
+                            tuple.setDimensionValue(ti, Long.toString(Long.parseLong(v) + timeZoneOffset));
                         }
                     } catch (NumberFormatException nfe) {
                         logger.warn("{} is not a long value.", gtValues[i]);
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
index 3a76be6..bfd6c4d 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
@@ -55,7 +55,7 @@ public class OLAPFilterRel extends Filter implements OLAPRel {
 
     ColumnRowType columnRowType;
     OLAPContext context;
-    boolean autoJustTimezone = KylinConfig.getInstanceFromEnv().isStreamingAutoJustTimezone();
+    boolean autoJustTimezone = KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone().length() > 0;
 
     public OLAPFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
         super(cluster, traits, child, condition);
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
index 3e07eca..e0792a0 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
@@ -52,6 +52,8 @@ import org.apache.kylin.metadata.filter.UnsupportedTupleFilter;
 import org.apache.kylin.metadata.filter.function.Functions;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.query.relnode.ColumnRowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.math.BigDecimal;
 import java.util.GregorianCalendar;
@@ -62,11 +64,13 @@ import java.util.TimeZone;
 
 public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
 
+    private static Logger logger = LoggerFactory.getLogger(TupleFilterVisitor.class);
+
     final ColumnRowType inputRowType;
 
     // is the fact table is a streamingv2 table
     private boolean autoJustByTimezone = false;
-    private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone())
+    private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone())
             .getRawOffset();
 
     public TupleFilterVisitor(ColumnRowType inputRowType) {
@@ -221,9 +225,10 @@ public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
                     newValues.add(null);
                 } else {
                     long ts = DateFormat.stringToMillis(v.toString());
-                    //  minus offset by timezone in RelNode level
-                    // this will affect request sent to storage level
-                    if (autoJustByTimezone) {
+                    // Change column value of date/timestamp type from local timezone to UTC timezone by minus offset in RelNode level.
+                    // This will change request sent to storage level(receiver), thus affect segment/fragment level purge.
+                    if (autoJustByTimezone && (type.getFamily() == SqlTypeFamily.TIMESTAMP
+                            || type.getFamily() == SqlTypeFamily.DATETIME)) {
                         ts -= TIME_ZONE_OFFSET;
                     }
                     newValues.add(String.valueOf(ts));
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
index f22ec89..cfd7086 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
@@ -34,6 +34,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.dimension.TimeDerivedColumnType;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.ISourceAware;
@@ -189,7 +190,7 @@ public class StreamingV2Controller extends BasicController {
             List<FieldSchema> fields;
             try {
                 HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(new HiveConf());
-                fields = metaStoreClient.getFields(tableDesc.getDatabase(), tableDesc.getName());
+                fields = metaStoreClient.getFields(KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable(), tableDesc.getName());
             } catch (NoSuchObjectException noObjectException) {
                 logger.info("table not exist in hive meta store for table:" + tableDesc.getIdentity(),
                         noObjectException);
@@ -208,8 +209,12 @@ public class StreamingV2Controller extends BasicController {
             for (ColumnDesc columnDesc : tableDesc.getColumns()) {
                 FieldSchema fieldSchema = fieldSchemaMap.get(columnDesc.getName().toUpperCase(Locale.ROOT));
                 if (fieldSchema == null) {
-                    incompatibleMsgs.add("column not exist in hive table:" + columnDesc.getName());
-                    continue;
+                    if (!TimeDerivedColumnType.isTimeDerivedColumn(columnDesc.getName())) {
+                        incompatibleMsgs.add("column not exist in hive table:" + columnDesc.getName());
+                        continue;
+                    } else {
+                        continue;
+                    }
                 }
                 if (!checkHiveTableFieldCompatible(fieldSchema, columnDesc)) {
                     String msg = String.format(Locale.ROOT,
diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/StreamStorageQuery.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/StreamStorageQuery.java
index f2050c1..b01e183 100644
--- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/StreamStorageQuery.java
+++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/StreamStorageQuery.java
@@ -106,7 +106,7 @@ public class StreamStorageQuery extends CubeStorageQuery {
 
         ITupleIterator realTimeResult;
         if (segmentsPlanner.canSkip(maxHistorySegmentTime, Long.MAX_VALUE)) {
-            logger.info("Skip scan realTime data");
+            logger.info("Skip scan realTime data, {}", maxHistorySegmentTime);
             realTimeResult = ITupleIterator.EMPTY_TUPLE_ITERATOR;
         } else {
             boolean isSelectAllQuery = isSelectAllQuery(request.getCuboid(), request.getGroups(), request.getFilter());
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java
index 309bee9..c8d4197 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java
@@ -28,21 +28,42 @@ import org.apache.kylin.stream.core.model.SegmentBuildState;
 import org.apache.kylin.stream.core.model.StreamingCubeConsumeState;
 import org.apache.kylin.stream.core.source.Partition;
 
+/**
+ * Independent metadata store for realtime streaming cluster, these metadata is transient.
+ */
 public interface StreamMetadataStore {
+    /**
+     * @return all streaming receivers, whether alive or not
+     */
     List<Node> getReceivers();
 
+    /**
+     * @return all streaming cube which in enable state
+     */
     List<String> getCubes();
 
+    /**
+     * @param cubeName the cube which need to be enable
+     */
     void addStreamingCube(String cubeName);
 
+    /**
+     * @param cubeName the cube which need to be disable(stop consuming)
+     */
     void removeStreamingCube(String cubeName);
 
     StreamingCubeConsumeState getStreamingCubeConsumeState(String cubeName);
 
     void saveStreamingCubeConsumeState(String cubeName, StreamingCubeConsumeState state);
 
+    /**
+     * @param receiver the streaming receiver which should be created
+     */
     void addReceiver(Node receiver);
 
+    /**
+     * @param receiver the streaming receiver which need to remove
+     */
     void removeReceiver(Node receiver);
 
     void removeCubeAssignment(String cubeName);
@@ -115,4 +136,6 @@ public interface StreamMetadataStore {
     SegmentBuildState getSegmentBuildState(String cubeName, String segmentName);
 
     boolean removeSegmentBuildState(String cubeName, String segmentName);
+
+    default void reportStat(){}
 }
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
index 4431b4c..3de0c82 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
@@ -21,6 +21,7 @@ package org.apache.kylin.stream.coordinator;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
@@ -61,6 +62,11 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
     private String cubeRoot;
     private String coordinatorRoot;
 
+    private AtomicLong readSuccess = new AtomicLong();
+    private AtomicLong readFail = new AtomicLong();
+    private AtomicLong writeSuccess = new AtomicLong();
+    private AtomicLong writeFail = new AtomicLong();
+
     public ZookeeperStreamMetadataStore() {
         this.client = StreamingUtils.getZookeeperClient();
         this.zkRoot = StreamingUtils.STREAM_ZK_ROOT;
@@ -93,9 +99,13 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
 
     @Override
     public void removeCubeAssignment(String cubeName) {
+        logger.trace("Remove cube assignment {}.", cubeName);
+        checkPath(cubeName);
         try {
             client.delete().forPath(ZKPaths.makePath(cubeRoot, cubeName, CUBE_ASSIGNMENT));
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Error when remove cube assignment " + cubeName, e);
             throw new StoreException(e);
         }
@@ -114,8 +124,10 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
                     cubeAssignmentList.add(assignment);
                 }
             }
+            readSuccess.getAndIncrement();
             return cubeAssignmentList;
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Error when get assignments", e);
             throw new StoreException(e);
         }
@@ -125,8 +137,10 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
     public Map<Integer, Map<String, List<Partition>>> getAllReplicaSetAssignments() {
         try {
             List<CubeAssignment> cubeAssignmentList = getAllCubeAssignments();
+            readSuccess.getAndIncrement();
             return AssignmentUtil.convertCubeAssign2ReplicaSetAssign(cubeAssignmentList);
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Error when get assignments", e);
             throw new StoreException(e);
         }
@@ -136,8 +150,10 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
     public Map<String, List<Partition>> getAssignmentsByReplicaSet(int replicaSetID) {
         try {
             Map<Integer, Map<String, List<Partition>>> replicaSetAssignmentsMap = getAllReplicaSetAssignments();
+            readSuccess.getAndIncrement();
             return replicaSetAssignmentsMap.get(replicaSetID);
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Error when get assignment for replica set " + replicaSetID, e);
             throw new StoreException(e);
         }
@@ -148,12 +164,15 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
         try {
             String cubeAssignmentPath = getCubeAssignmentPath(cubeName);
             if (client.checkExists().forPath(cubeAssignmentPath) == null) {
+                logger.warn("Cannot find content at {}.", cubeAssignmentPath);
                 return null;
             }
             byte[] data = client.getData().forPath(cubeAssignmentPath);
+            readSuccess.getAndIncrement();
             CubeAssignment assignment = CubeAssignment.deserializeCubeAssignment(data);
             return assignment;
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Error when get cube assignment for " + cubeName, e);
             throw new StoreException(e);
         }
@@ -164,11 +183,13 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
         List<ReplicaSet> result = Lists.newArrayList();
         try {
             List<String> replicaSetIDs = client.getChildren().forPath(replicaSetRoot);
+            readSuccess.getAndIncrement();
             for (String replicaSetID : replicaSetIDs) {
                 ReplicaSet replicaSet = getReplicaSet(Integer.parseInt(replicaSetID));
                 result.add(replicaSet);
             }
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Error when get replica sets", e);
             throw new StoreException(e);
         }
@@ -179,6 +200,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
     public List<Integer> getReplicaSetIDs() {
         try {
             List<String> replicaSetIDs = client.getChildren().forPath(replicaSetRoot);
+            readSuccess.getAndIncrement();
             return Lists.transform(replicaSetIDs, new Function<String, Integer>() {
                 @Nullable
                 @Override
@@ -187,6 +209,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
                 }
             });
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Error when get replica sets", e);
             throw new StoreException(e);
         }
@@ -214,11 +237,14 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
                 currMaxID = Collections.max(rsIDList);
             }
             int newReplicaSetID = currMaxID + 1;
+            logger.trace("Id of new replica set {} is {}.", rs, newReplicaSetID);
             rs.setReplicaSetID(newReplicaSetID);
             String replicaSetPath = ZKPaths.makePath(replicaSetRoot, String.valueOf(newReplicaSetID));
             client.create().creatingParentsIfNeeded().forPath(replicaSetPath, serializeReplicaSet(rs));
+            writeSuccess.getAndIncrement();
             return newReplicaSetID;
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Error when create replicaSet " + rs, e);
             throw new StoreException(e);
         }
@@ -230,7 +256,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
             byte[] replicaSetData = serializeReplicaSet(rs);
             client.setData().forPath(ZKPaths.makePath(replicaSetRoot, String.valueOf(rs.getReplicaSetID())),
                     replicaSetData);
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("error when update replicaSet " + rs, e);
             throw new StoreException(e);
         }
@@ -240,8 +268,10 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
     public Node getCoordinatorNode() {
         try {
             byte[] nodeData = client.getData().forPath(coordinatorRoot);
+            readSuccess.getAndIncrement();
             return JsonUtil.readValue(nodeData, Node.class);
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Error when get coordinator leader", e);
             throw new StoreException(e);
         }
@@ -252,7 +282,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
         try {
             byte[] coordinatorBytes = JsonUtil.writeValueAsBytes(coordinator);
             client.setData().forPath(coordinatorRoot, coordinatorBytes);
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Error when set coordinator leader to " + coordinator, e);
             throw new StoreException(e);
         }
@@ -260,6 +292,8 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
 
     @Override
     public void saveSourceCheckpoint(String cubeName, String segmentName, int rsID, String sourceCheckpoint) {
+        checkPath(cubeName, segmentName);
+        logger.trace("Save remote checkpoint {} {} {} with content {}.", cubeName, segmentName, rsID, sourceCheckpoint);
         try {
             String path = ZKPaths.makePath(cubeRoot, cubeName, CUBE_SRC_CHECKPOINT, segmentName,
                     String.valueOf(rsID));
@@ -269,7 +303,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
                 logger.warn("Checkpoint path already existed under path {}, overwrite with new one.", path);
             }
             client.setData().forPath(path, Bytes.toBytes(sourceCheckpoint));
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Error when save remote checkpoint for " + cubeName + " " + segmentName , e);
             throw new StoreException(e);
         }
@@ -293,8 +329,10 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
                 String sourcePos = Bytes.toString(checkpointBytes);
                 result.put(Integer.valueOf(child), sourcePos);
             }
+            readSuccess.getAndIncrement();
             return result;
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Error to fetch remote checkpoint for " + cubeName + " " + segmentName, e);
             throw new StoreException(e);
         }
@@ -314,9 +352,10 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
             if (replicaSetData != null && replicaSetData.length > 0) {
                 result = JsonUtil.readValue(Bytes.toString(replicaSetData), ReplicaSet.class);
             }
-
+            readSuccess.getAndIncrement();
             return result;
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Error when get replica set " + rsID, e);
             throw new StoreException(e);
         }
@@ -326,7 +365,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
     public void removeReplicaSet(int rsID) {
         try {
             client.delete().forPath(ZKPaths.makePath(replicaSetRoot, String.valueOf(rsID)));
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Error when remove replica set " + rsID, e);
             throw new StoreException(e);
         }
@@ -341,7 +382,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
                 Node node = Node.from(receiverName.replace('_', ':'));
                 result.add(node);
             }
+            readSuccess.getAndIncrement();
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Error when fetch receivers", e);
             throw new StoreException(e);
         }
@@ -351,8 +394,11 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
     @Override
     public List<String> getCubes() {
         try {
-            return client.getChildren().forPath(cubeRoot);
+            List<String> res =  client.getChildren().forPath(cubeRoot);
+            readSuccess.getAndIncrement();
+            return res;
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Error when fetch cubes", e);
             throw new StoreException(e);
         }
@@ -360,12 +406,15 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
 
     @Override
     public void addStreamingCube(String cube) {
+        checkPath(cube);
         try {
             String path = ZKPaths.makePath(cubeRoot, cube);
             if (client.checkExists().forPath(path) == null) {
                 client.create().creatingParentsIfNeeded().forPath(path);
             }
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Error when add cube " + cube, e);
             throw new StoreException(e);
         }
@@ -373,12 +422,16 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
 
     @Override
     public void removeStreamingCube(String cube) {
+        logger.trace("Remove cube {}", cube);
+        checkPath(cube);
         try {
             String path = ZKPaths.makePath(cubeRoot, cube);
             if (client.checkExists().forPath(path) != null) {
                 client.delete().deletingChildrenIfNeeded().forPath(ZKPaths.makePath(cubeRoot, cube));
             }
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Error when remove cube " + cube, e);
             throw new StoreException(e);
         }
@@ -390,6 +443,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
             String path = getCubeConsumeStatePath(cube);
             if (client.checkExists().forPath(path) != null) {
                 byte[] cubeInfoData = client.getData().forPath(path);
+                readSuccess.getAndIncrement();
                 if (cubeInfoData != null && cubeInfoData.length > 0) {
                     return JsonUtil.readValue(cubeInfoData, StreamingCubeConsumeState.class);
                 } else {
@@ -399,6 +453,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
                 return StreamingCubeConsumeState.RUNNING;
             }
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Error when get streaming cube consume state " + cube, e);
             throw new StoreException(e);
         }
@@ -406,6 +461,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
 
     @Override
     public void saveStreamingCubeConsumeState(String cube, StreamingCubeConsumeState state) {
+        checkPath(cube);
         try {
             String path = getCubeConsumeStatePath(cube);
             if (client.checkExists().forPath(path) != null) {
@@ -413,7 +469,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
             } else {
                 client.create().creatingParentsIfNeeded().forPath(path, JsonUtil.writeValueAsBytes(state));
             }
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Error when save streaming cube consume state " + cube + " with " + state, e);
             throw new StoreException(e);
         }
@@ -421,12 +479,17 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
 
     @Override
     public void addReceiver(Node receiver) {
+        logger.trace("Add {}.", receiver);
         try {
             String receiverPath = ZKPaths.makePath(receiverRoot, receiver.toNormalizeString());
             if (client.checkExists().forPath(receiverPath) == null) {
                 client.create().creatingParentsIfNeeded().forPath(receiverPath);
+            } else {
+                logger.warn("{} exists.", receiverPath);
             }
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Error when add new receiver " + receiver, e);
             throw new StoreException(e);
         }
@@ -434,12 +497,15 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
 
     @Override
     public void removeReceiver(Node receiver) {
+        logger.trace("Remove {}.", receiver);
         try {
             String receiverPath = ZKPaths.makePath(receiverRoot, receiver.toNormalizeString());
             if (client.checkExists().forPath(receiverPath) != null) {
                 client.delete().deletingChildrenIfNeeded().forPath(receiverPath);
             }
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Error when remove receiver " + receiver, e);
             throw new StoreException(e);
         }
@@ -447,7 +513,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
 
     @Override
     public void saveNewCubeAssignment(CubeAssignment newCubeAssignment) {
-        logger.info("Try saving new cube assignment for: {}.", newCubeAssignment);
+        logger.trace("Try saving new cube assignment for: {}.", newCubeAssignment);
         try {
             String path = getCubeAssignmentPath(newCubeAssignment.getCubeName());
             if (client.checkExists().forPath(path) == null) {
@@ -456,7 +522,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
             } else {
                 client.setData().forPath(path, CubeAssignment.serializeCubeAssignment(newCubeAssignment));
             }
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Fail to save cube assignment", e);
             throw new StoreException(e);
         }
@@ -472,6 +540,8 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
 
     @Override
     public void addCompleteReplicaSetForSegmentBuild(String cubeName, String segmentName, int rsID) {
+        logger.trace("Add completed rs {} to {} {}", rsID, cubeName, segmentName);
+        checkPath(cubeName, segmentName);
         try {
             String path = ZKPaths.makePath(cubeRoot, cubeName, CUBE_BUILD_STATE, segmentName, "replica_sets",
                     String.valueOf(rsID));
@@ -480,7 +550,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
             } else {
                 logger.warn("ReplicaSet id {} existed under path {}", rsID, path);
             }
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Fail to add replicaSet Id to segment build state for " + segmentName + " " + rsID, e);
             throw new StoreException(e);
         }
@@ -488,11 +560,15 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
 
     @Override
     public void updateSegmentBuildState(String cubeName, String segmentName, SegmentBuildState.BuildState state) {
+        logger.trace("Update {} {} to state {}", cubeName, segmentName, state);
+        checkPath(cubeName, segmentName);
         try {
             String stateStr = JsonUtil.writeValueAsString(state);
             String path = ZKPaths.makePath(cubeRoot, cubeName, CUBE_BUILD_STATE, segmentName);
             client.setData().forPath(path, Bytes.toBytes(stateStr));
+            writeSuccess.getAndIncrement();
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Fail to update segment build state for " + segmentName + " to " + state, e);
             throw new StoreException(e);
         }
@@ -506,6 +582,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
                 return Lists.newArrayList();
             }
             List<String> segments = client.getChildren().forPath(cubePath);
+            readSuccess.getAndIncrement();
             List<SegmentBuildState> result = Lists.newArrayList();
             for (String segment : segments) {
                 SegmentBuildState segmentState = doGetSegmentBuildState(cubePath, segment);
@@ -513,6 +590,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
             }
             return result;
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Fail to get segment build states " + cubeName, e);
             throw new StoreException(e);
         }
@@ -524,6 +602,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
             String cubePath = getCubeBuildStatePath(cubeName);
             return doGetSegmentBuildState(cubePath, segmentName);
         } catch (Exception e) {
+            readFail.getAndIncrement();
             logger.error("Fail to get segment build state for " + cubeName + " " +segmentName, e);
             throw new StoreException(e);
         }
@@ -533,6 +612,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
         SegmentBuildState segmentState = new SegmentBuildState(segmentName);
         String segmentPath = ZKPaths.makePath(cubePath, segmentName);
         byte[] stateBytes = client.getData().forPath(segmentPath);
+        readSuccess.getAndIncrement();
         SegmentBuildState.BuildState state;
         if (stateBytes != null && stateBytes.length > 0) {
             String stateStr = Bytes.toString(stateBytes);
@@ -549,16 +629,20 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
 
     @Override
     public boolean removeSegmentBuildState(String cubeName, String segmentName) {
+        logger.trace("Remove {} {}", cubeName, segmentName);
+        checkPath(cubeName, segmentName);
         try {
             String path = ZKPaths.makePath(cubeRoot, cubeName, CUBE_BUILD_STATE, segmentName);
             if (client.checkExists().forPath(path) != null) {
                 client.delete().deletingChildrenIfNeeded().forPath(path);
+                writeSuccess.getAndIncrement();
                 return true;
             } else {
                 logger.warn("Cube segment deep store state does not exisit!, path {} ", path);
                 return false;
             }
         } catch (Exception e) {
+            writeFail.getAndIncrement();
             logger.error("Fail to remove cube segment deep store state " + cubeName + " " + segmentName, e);
             throw new StoreException(e);
         }
@@ -575,4 +659,30 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
     private String getCubeConsumeStatePath(String cubeName) {
         return ZKPaths.makePath(cubeRoot, cubeName, CUBE_CONSUME_STATE);
     }
+
+    String reportTemplate = "[StreamMetadataStoreStats]  read : {} ; write: {} ; read failed: {} ; write failed: {} .";
+    private AtomicLong lastReport = new AtomicLong();
+    private static final long REPORT_DURATION = 300L * 1000;
+
+    @Override
+    public void reportStat() {
+        if (writeFail.get() > 0 || readFail.get() > 0) {
+            logger.warn(reportTemplate, readSuccess.get(), writeSuccess.get(), readFail.get(), writeFail.get());
+        } else {
+            if (System.currentTimeMillis() - lastReport.get() >= REPORT_DURATION) {
+                logger.debug(reportTemplate, readSuccess.get(), writeSuccess.get(), readFail.get(), writeFail.get());
+            } else {
+                return;
+            }
+        }
+        lastReport.set(System.currentTimeMillis());
+    }
+
+    private void checkPath(String... paths){
+        for (String path : paths){
+            if (path == null || path.length() == 0) {
+                throw new IllegalArgumentException("Illegal zookeeper path.");
+            }
+        }
+    }
 }
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/AssignmentsCache.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/AssignmentsCache.java
index 8cf3e60..f37469d 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/AssignmentsCache.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/AssignmentsCache.java
@@ -18,10 +18,13 @@
 
 package org.apache.kylin.stream.coordinator.assign;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
@@ -30,21 +33,32 @@ import org.apache.kylin.stream.coordinator.StreamMetadataStore;
 import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.kylin.stream.core.model.CubeAssignment;
 import org.apache.kylin.stream.core.model.ReplicaSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Introduce AssignmentsCache to reduce pressure of
+ * @see StreamMetadataStore .
+ */
 public class AssignmentsCache {
-    private static volatile AssignmentsCache instance = new AssignmentsCache();
+    private static final Logger logger = LoggerFactory.getLogger(AssignmentsCache.class);
+
+    private static final AssignmentsCache instance = new AssignmentsCache();
     private static final String ASSIGNMENT_ENTITY = "cube_assign";
     private StreamMetadataStore metadataStore;
-    private ConcurrentMap<String, List<ReplicaSet>> cubeAssignmentCache;
+    private Cache<String, List<ReplicaSet>> cubeAssignmentCache;
 
     private AssignmentsCache() {
         this.metadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
         KylinConfig config = KylinConfig.getInstanceFromEnv();
-        cubeAssignmentCache = Maps.newConcurrentMap();
-        Broadcaster.getInstance(config).registerListener(new AssignCacheSyncListener(), ASSIGNMENT_ENTITY);
+        cubeAssignmentCache = CacheBuilder.newBuilder()
+                .removalListener((RemovalNotification<String, List<ReplicaSet>> notification) -> logger
+                        .debug("{} is removed because {} ", notification.getKey(), notification.getCause()))
+                .expireAfterWrite(300, TimeUnit.SECONDS)
+                .build();
+         Broadcaster.getInstance(config).registerListener(new AssignCacheSyncListener(), ASSIGNMENT_ENTITY);
     }
 
     public static AssignmentsCache getInstance() {
@@ -52,32 +66,38 @@ public class AssignmentsCache {
     }
 
     public List<ReplicaSet> getReplicaSetsByCube(String cubeName) {
-        if (cubeAssignmentCache.get(cubeName) == null) {
-            synchronized (cubeAssignmentCache) {
-                if (cubeAssignmentCache.get(cubeName) == null) {
-                    List<ReplicaSet> result = Lists.newArrayList();
-
-                    CubeAssignment assignment = metadataStore.getAssignmentsByCube(cubeName);
-                    for (Integer replicaSetID : assignment.getReplicaSetIDs()) {
-                        result.add(metadataStore.getReplicaSet(replicaSetID));
-                    }
-                    cubeAssignmentCache.put(cubeName, result);
+        List<ReplicaSet> ret ;
+        try {
+            ret = cubeAssignmentCache.get(cubeName, () -> {
+                List<ReplicaSet> result = Lists.newArrayList();
+                CubeAssignment assignment = metadataStore.getAssignmentsByCube(cubeName);
+                if(assignment == null){
+                    logger.error("Inconsistent metadata for assignment of {}, do check it.", cubeName);
+                    return result;
+                }
+                for (Integer replicaSetID : assignment.getReplicaSetIDs()) {
+                    result.add(metadataStore.getReplicaSet(replicaSetID));
                 }
-            }
+                logger.trace("Update assignment with {}", result);
+                return result;
+            });
+        } catch (ExecutionException e){
+            logger.warn("Failed to load CubeAssignment", e);
+            throw new IllegalStateException("Failed to load CubeAssignment", e);
         }
-        return cubeAssignmentCache.get(cubeName);
+        return ret;
     }
 
     public void clearCubeCache(String cubeName) {
         Broadcaster.getInstance(KylinConfig.getInstanceFromEnv()).announce(ASSIGNMENT_ENTITY,
                 Broadcaster.Event.UPDATE.getType(), cubeName);
-        cubeAssignmentCache.remove(cubeName);
+        cubeAssignmentCache.invalidate(cubeName);
     }
 
     private class AssignCacheSyncListener extends Listener {
-        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey)
-                throws IOException {
-            cubeAssignmentCache.remove(cacheKey);
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) {
+            cubeAssignmentCache.invalidate(cacheKey);
         }
     }
 }
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/DefaultAssigner.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/DefaultAssigner.java
index 02bcc09..623fd05 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/DefaultAssigner.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/DefaultAssigner.java
@@ -33,12 +33,16 @@ import org.apache.kylin.stream.core.model.ReplicaSet;
 import org.apache.kylin.stream.core.source.Partition;
 
 import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default implementation for Assigner, assign according to the consumer task number for each cube
  *
  */
 public class DefaultAssigner implements Assigner {
+    private static final Logger logger = LoggerFactory.getLogger(DefaultAssigner.class);
+
     @Override
     public Map<Integer, Map<String, List<Partition>>> reBalancePlan(List<ReplicaSet> replicaSets,
             List<StreamingCubeInfo> cubes, List<CubeAssignment> existingAssignments) {
@@ -191,6 +195,11 @@ public class DefaultAssigner implements Assigner {
     private List<List<Partition>> splitCubeConsumeTasks(StreamingCubeInfo cube, int replicaSetNum) {
         List<Partition> partitionsOfCube = cube.getStreamingTableSourceInfo().getPartitions();
         int cubeConsumerTaskNum = getCubeConsumerTasks(cube, replicaSetNum);
+        int partitionsPerReceiver = partitionsOfCube.size()/cubeConsumerTaskNum ;
+        if (partitionsPerReceiver > 3 && replicaSetNum > cubeConsumerTaskNum) {
+            logger.info(
+                    "You may consider improve `kylin.stream.cube-num-of-consumer-tasks` because you still some quta left.");
+        }
         List<List<Partition>> result = Lists.newArrayListWithCapacity(cubeConsumerTaskNum);
         for (int i = 0; i < cubeConsumerTaskNum; i++) {
             result.add(Lists.<Partition>newArrayList());
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
index 6f5bb0d..b0e3e92 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
@@ -17,10 +17,10 @@
  */
 package org.apache.kylin.stream.coordinator.coordinate;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -36,6 +36,7 @@ import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.stream.coordinator.StreamingCubeInfo;
 import org.apache.kylin.stream.coordinator.coordinate.annotations.NonSideEffect;
 import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent;
+import org.apache.kylin.stream.coordinator.exception.StoreException;
 import org.apache.kylin.stream.core.model.CubeAssignment;
 import org.apache.kylin.stream.core.model.SegmentBuildState;
 import org.slf4j.Logger;
@@ -44,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -106,6 +108,7 @@ public class BuildJobSubmitter implements Runnable {
                 buildInfos = previousValue;
             }
         }
+        logger.trace("Add job {} of segment [{} - {}] to track.", segmentBuildJob.jobID, segmentBuildJob.cubeName, segmentBuildJob.segmentName);
         boolean addSucceed = buildInfos.add(segmentBuildJob);
         if (!addSucceed) {
             logger.debug("Add {} failed because we have a duplicated one.", segmentBuildJob);
@@ -136,12 +139,15 @@ public class BuildJobSubmitter implements Runnable {
 
     void doRun() {
         checkTimes++;
-        logger.info("\n----------------------------------------------------------- {}", checkTimes);
+        logger.debug("\n========================================================================= {}", checkTimes);
+        dumpSegmentBuildJobCheckList();
+        coordinator.getStreamMetadataStore().reportStat();
         List<SegmentJobBuildInfo> successJobs = traceEarliestSegmentBuildJob();
 
         for (SegmentJobBuildInfo successJob : successJobs) {
             ConcurrentSkipListSet<SegmentJobBuildInfo> submittedBuildJobs = segmentBuildJobCheckList
                     .get(successJob.cubeName);
+            logger.trace("Remove job {} from check list.", successJob.jobID);
             submittedBuildJobs.remove(successJob);
         }
 
@@ -170,8 +176,11 @@ public class BuildJobSubmitter implements Runnable {
     @NonSideEffect
     List<SegmentJobBuildInfo> traceEarliestSegmentBuildJob() {
         List<SegmentJobBuildInfo> successJobs = Lists.newArrayList();
-        for (ConcurrentSkipListSet<SegmentJobBuildInfo> buildInfos : segmentBuildJobCheckList.values()) {
+        for (Map.Entry<String, ConcurrentSkipListSet<SegmentJobBuildInfo>> entry :
+                segmentBuildJobCheckList.entrySet()) {
+            ConcurrentSkipListSet<SegmentJobBuildInfo> buildInfos = entry.getValue();
             if (buildInfos.isEmpty()) {
+                logger.trace("Skip {}", entry.getKey());
                 continue;
             }
 
@@ -186,6 +195,7 @@ public class BuildJobSubmitter implements Runnable {
                     continue;
                 }
                 ExecutableState jobState = cubingJob.getStatus();
+                logger.debug("Current job state {}", jobState);
                 if (ExecutableState.SUCCEED.equals(jobState)) {
                     CubeManager cubeManager = coordinator.getCubeManager();
                     CubeInstance cubeInstance = cubeManager.getCube(segmentBuildJob.cubeName).latestCopyForWrite();
@@ -205,11 +215,10 @@ public class BuildJobSubmitter implements Runnable {
                         logger.warn("Job:{} is error, exceed max retry. Kylin admin could resume it or discard it"
                                 + "(to let new building job be sumbitted) .", segmentBuildJob);
                     }
-                } else {
-                    logger.debug("Current job state {}", jobState);
                 }
-            } catch (Exception e) {
-                logger.error("Error when check streaming segment job build state:" + segmentBuildJob, e);
+            } catch (StoreException storeEx) {
+                logger.error("Error when check streaming segment job build state:" + segmentBuildJob, storeEx);
+                throw storeEx;
             }
         }
         return successJobs;
@@ -226,7 +235,7 @@ public class BuildJobSubmitter implements Runnable {
                 boolean ok = submitSegmentBuildJob(cubeName, segmentName);
                 allSubmited = allSubmited && ok;
                 if (!ok) {
-                    logger.debug("Failed to submit building job.");
+                    logger.debug("Failed to submit building job for {}.", segmentName);
                 }
             }
             if (allSubmited) {
@@ -267,15 +276,16 @@ public class BuildJobSubmitter implements Runnable {
         List<SegmentBuildState> segmentStates = coordinator.getStreamMetadataStore().getSegmentBuildStates(cubeName);
         int inBuildingSegments = cubeInstance.getBuildingSegments().size();
         int leftQuota = allowMaxBuildingSegments - inBuildingSegments;
+        boolean stillQuotaForNewSegment = true;
 
         // Sort it so we can iterate segments from eariler one to newer one
         Collections.sort(segmentStates);
 
         for (int i = 0; i < segmentStates.size(); i++) {
-
+            boolean needRebuild = false;
             if (leftQuota <= 0) {
-                logger.info("No left quota to build segments for cube:{}", cubeName);
-                break;
+                logger.info("No left quota to build segments for cube:{} at {}", cubeName, leftQuota);
+                stillQuotaForNewSegment = false;
             }
 
             SegmentBuildState segmentState = segmentStates.get(i);
@@ -293,26 +303,24 @@ public class BuildJobSubmitter implements Runnable {
 
             // We already have a building job for current segment
             if (segmentState.isInBuilding()) {
-                boolean needRebuild = checkSegmentBuildingJob(segmentState, cubeName, cubeInstance);
+                needRebuild = checkSegmentBuildingJob(segmentState, cubeName, cubeInstance);
                 if (!needRebuild)
                     continue;
             } else if (segmentState.isInWaiting()) {
-                // The data has not been uploaded to remote completely, or job is discard
+                // The data maybe uploaded to remote completely, or job is discard
                 // These two case should be submit a building job, just let go through it
             }
 
             boolean readyToBuild = checkSegmentIsReadyToBuild(segmentStates, i, cubeAssignedReplicaSets);
             if (!readyToBuild) {
                 logger.debug("Segment {} {} is not ready to submit a building job.", cubeName, segmentState);
-                // use break instead continue here, because we should transfer to next queue in sequential way (no jump the queue)
-                break;
-            } else {
+            } else if (stillQuotaForNewSegment || needRebuild) {
                 result.add(segmentState.getSegmentName());
                 leftQuota--;
             }
         }
-        if (logger.isDebugEnabled() && result.isEmpty()) {
-            logger.debug("Candidate {} : {}.", cubeName, String.join(", ", result));
+        if (logger.isDebugEnabled() && !result.isEmpty()) {
+            logger.debug("{} Candidate segment list to be built : {}.", cubeName, String.join(", ", result));
         }
         return result;
     }
@@ -364,13 +372,14 @@ public class BuildJobSubmitter implements Runnable {
                 }
             }
 
-            if (!segmentExists) {
-                logger.debug("Create segment for {} {} .", cubeName, segmentName);
-                newSeg = coordinator.getCubeManager().appendSegment(cubeInstance,
-                        new SegmentRange.TSRange(segmentRange.getFirst(), segmentRange.getSecond()));
-            } else {
-                logger.info("Segment {} exists.", segmentName);
+            if (segmentExists) {
+                logger.warn("Segment {} exists, it will be forced deleted.", segmentName);
+                coordinator.getCubeManager().updateCubeDropSegments(cubeInstance, newSeg);
             }
+            
+            logger.debug("Create segment for {} {} .", cubeName, segmentName);
+            newSeg = coordinator.getCubeManager().appendSegment(cubeInstance,
+                    new SegmentRange.TSRange(segmentRange.getFirst(), segmentRange.getSecond()));
 
             // Step 2. create and submit new build job
             DefaultChainedExecutable executable = getStreamingCubingJob(newSeg);
@@ -414,7 +423,11 @@ public class BuildJobSubmitter implements Runnable {
                 return false;
             }
             CubingJob cubingJob = (CubingJob) coordinator.getExecutableManager().getJob(jobId);
-            Preconditions.checkNotNull(cubingJob, "CubingJob should not be null.");
+            if (cubingJob == null) {
+                // Cubing job is dropped manually, or metadata is broken.
+                logger.warn("Looks like cubing job is dropped manually, it will be submitted a new one.");
+                return true;
+            }
             ExecutableState jobState = cubingJob.getStatus();
 
             // If job is already succeed and HBase segment in ready state, remove the build state
@@ -438,8 +451,13 @@ public class BuildJobSubmitter implements Runnable {
 
             // If a job is discard, we will try to resumbit it later.
             if (ExecutableState.DISCARDED.equals(jobState)) {
-                logger.info("Job:{} is discard, resubmit it later.", jobId);
-                return true;
+                if (KylinConfig.getInstanceFromEnv().isAutoResubmitDiscardJob()) {
+                    logger.debug("Job:{} is discard, resubmit it later.", jobId);
+                    return true;
+                } else {
+                    logger.debug("Job:{} is discard, please resubmit yourself.", jobId);
+                    return false;
+                }
             } else {
                 logger.info("Job:{} is in running, job state: {}.", jobId, jobState);
             }
@@ -484,7 +502,12 @@ public class BuildJobSubmitter implements Runnable {
                     }
                 }
             }
-            return notCompleteReplicaSets.isEmpty();
+            if (notCompleteReplicaSets.isEmpty()) {
+                return true;
+            } else {
+                logger.debug("Not ready data for replica sets: {}", notCompleteReplicaSets);
+                return false;
+            }
         }
     }
 
@@ -495,4 +518,16 @@ public class BuildJobSubmitter implements Runnable {
     public DefaultChainedExecutable getStreamingCubingJob(CubeSegment segment){
         return new StreamingCubingEngine().createStreamingCubingJob(segment, "SYSTEM");
     }
-}
+
+    void dumpSegmentBuildJobCheckList() {
+        if (!logger.isTraceEnabled())
+            return;
+        StringBuilder sb = new StringBuilder("Dump JobCheckList:\t");
+        for (Map.Entry<String, ConcurrentSkipListSet<SegmentJobBuildInfo>> cube : segmentBuildJobCheckList.entrySet()) {
+            sb.append(cube.getKey()).append(":").append(cube.getValue());
+        }
+        if (logger.isTraceEnabled()) {
+            logger.trace(sb.toString());
+        }
+    }
+}
\ No newline at end of file
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
index 81b2e8c..32e5b88 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
@@ -36,6 +36,7 @@ import org.apache.kylin.stream.coordinator.assign.AssignmentsCache;
 import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent;
 import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicAndNotIdempotent;
 import org.apache.kylin.stream.coordinator.exception.ClusterStateException;
+import org.apache.kylin.stream.coordinator.exception.StoreException;
 import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
 import org.apache.kylin.stream.core.model.AssignRequest;
 import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
@@ -72,7 +73,7 @@ import java.util.stream.Collectors;
  *
  * In a multi-step transcation, following steps should be thought twice:
  *  1. should fail fast or continue when exception thrown.
- *  2. should API(remote call) be synchronous or asynchronous
+ *  2. should API(RPC) be synchronous or asynchronous
  *  3. when transcation failed, will roll back always succeed
  *  4. transcation should be idempotent so when it failed, it could be fixed by retry
  * </pre>
@@ -151,10 +152,10 @@ public class ReceiverClusterManager {
 
     /**
      * <pre>
-     * Reassign action is a process which move some consumption task from some replica set to new replica set.
+     * Reassign action is a process which move some consumption task from some replica set to some new replica sets.
      *
      * It is necessary in some case such as :
-     *  - new topic partition was added
+     *  - new topic partition was added, or receiver can not catch produce rate, so we need scale out
      *  - wordload not balance between different replica set
      *  - some nodes have to be offlined so the consumption task have be transfered
      * </pre>
@@ -470,15 +471,15 @@ public class ReceiverClusterManager {
      *
      * @param partitions specific topic partitions which replica set should consume
      * @param startConsumer should receiver start consumption at once
-     * @param mustAllSucceed if set to true, we should ensure all receivers has been correctly notified; false
+     * @param failFast if set to true, we should ensure all receivers has been correctly notified(fail-fast); false
      *                       for ensure at least one receivers has been correctly notified
      *
      * @throws IOException throwed when assign failed
      */
     @NotAtomicIdempotent
     void assignCubeToReplicaSet(ReplicaSet rs, String cubeName, List<Partition> partitions, boolean startConsumer,
-            boolean mustAllSucceed) throws IOException {
-        boolean hasNodeAssigned = false;
+            boolean failFast) throws IOException {
+        boolean atLeastOneAssigned = false;
         IOException exception = null;
         AssignRequest assignRequest = new AssignRequest();
         assignRequest.setCubeName(cubeName);
@@ -487,16 +488,16 @@ public class ReceiverClusterManager {
         for (final Node node : rs.getNodes()) {
             try {
                 getCoordinator().assignToReceiver(node, assignRequest);
-                hasNodeAssigned = true;
+                atLeastOneAssigned = true;
             } catch (IOException e) {
-                if (mustAllSucceed) {
+                if (failFast) {
                     throw e;
                 }
                 exception = e;
-                logger.error("cube:" + cubeName + " consumers start fail for node:" + node.toString(), e);
+                logger.error("Cube:" + cubeName + " consumers start fail for node:" + node.toString(), e);
             }
         }
-        if (!hasNodeAssigned) {
+        if (!atLeastOneAssigned) {
             if (exception != null) {
                 throw exception;
             }
@@ -506,11 +507,12 @@ public class ReceiverClusterManager {
     /**
      * When a segment build job succeed, we should do some following job to deliver it to historical part.
      *
+     * @throws org.apache.kylin.stream.coordinator.exception.StoreException thrown when write metadata failed
      * @return true if promote succeed, else false
      */
     @NotAtomicIdempotent
     protected boolean segmentBuildComplete(CubingJob cubingJob, CubeInstance cubeInstance, CubeSegment cubeSegment,
-            SegmentJobBuildInfo segmentBuildInfo) throws IOException {
+            SegmentJobBuildInfo segmentBuildInfo) {
         String cubeName = segmentBuildInfo.cubeName;
         String segmentName = segmentBuildInfo.segmentName;
 
@@ -521,8 +523,14 @@ public class ReceiverClusterManager {
         }
 
         if (!SegmentStatusEnum.READY.equals(cubeSegment.getStatus())) {
-            promoteNewSegment(cubingJob, cubeInstance, cubeSegment);
+            try {
+                promoteNewSegment(cubingJob, cubeInstance, cubeSegment);
+            } catch (IOException storeException) {
+                throw new StoreException("Promote failed because of metadata store.", storeException);
+            }
             logger.debug("Promote {} succeed.", segmentName);
+        } else {
+            logger.debug("Segment status is: {}", cubeSegment.getStatus());
         }
 
         // Step 2. delete local segment files in receiver side because these are useless now
@@ -533,8 +541,9 @@ public class ReceiverClusterManager {
             ReplicaSet rs = getCoordinator().getStreamMetadataStore().getReplicaSet(replicaSetID);
             for (Node node : rs.getNodes()) {
                 try {
-                    getCoordinator().notifyReceiverBuildSuccess(node, cubeName, segmentName);
+                    getCoordinator().notifyReceiverBuildSuccess(node, cubeName, segmentName); // Idempotent
                 } catch (IOException e) {
+                    // It is OK to just print log, unused segment cache in receiver will be deleted in next call
                     logger.error("error when remove cube segment for receiver:" + node, e);
                 }
             }
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
index 7c3c575..f234c2e 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
@@ -113,7 +113,7 @@ public class StreamingCoordinator implements CoordinatorClient {
 
     private StreamingCoordinator() {
         this.streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
-        clusterManager = new ReceiverClusterManager(this);
+        this.clusterManager = new ReceiverClusterManager(this);
         this.receiverAdminClient = new HttpReceiverAdminClient();
         this.assigner = getAssigner();
         this.zkClient = StreamingUtils.getZookeeperClient();
@@ -195,7 +195,7 @@ public class StreamingCoordinator implements CoordinatorClient {
                 for (Node receiver : rs.getNodes()) {
                     try {
                         unAssignToReceiver(receiver, request);
-                    } catch (Exception e) {
+                    } catch (IOException e) {
                         logger.error("Exception throws when unAssign receiver", e);
                         unAssignedFailReceivers.add(receiver);
                     }
@@ -297,6 +297,7 @@ public class StreamingCoordinator implements CoordinatorClient {
         }
     }
 
+    @NotAtomicIdempotent
     private void doAssignCube(String cubeName, CubeAssignment assignment) {
         Set<ReplicaSet> successRS = Sets.newHashSet();
         try {
@@ -306,11 +307,11 @@ public class StreamingCoordinator implements CoordinatorClient {
                         false);
                 successRS.add(rs);
             }
-            logger.debug("Committing assign {} transaction.", cubeName);
+            logger.debug("Committing assignment {} transaction.", cubeName);
             streamMetadataStore.saveNewCubeAssignment(assignment);
-            logger.debug("Committed assign {} transaction.", cubeName);
+            logger.debug("Committed assignment {} transaction.", cubeName);
         } catch (Exception e) {
-            // roll back the success group assignment
+            logger.debug("Starting roll back success receivers.");
             for (ReplicaSet rs : successRS) {
 
                 UnAssignRequest request = new UnAssignRequest();
@@ -350,6 +351,7 @@ public class StreamingCoordinator implements CoordinatorClient {
         int replicaSetID = streamMetadataStore.createReplicaSet(rs);
         try {
             for (Node receiver : rs.getNodes()) {
+                logger.trace("Notify {} that it has been added to {} .", receiver, replicaSetID);
                 addReceiverToReplicaSet(receiver, replicaSetID);
             }
         } catch (IOException e) {
@@ -379,7 +381,7 @@ public class StreamingCoordinator implements CoordinatorClient {
         for (ReplicaSet other : allReplicaSet) {
             if (other.getReplicaSetID() != replicaSetID) {
                 if (other.getNodes().contains(receiver)) {
-                    logger.error("error add Node {} to replicaSet {}, already exist in replicaSet {} ", nodeID,
+                    logger.error("Error add Node {} to replicaSet {}, already exist in replicaSet {} ", nodeID,
                             replicaSetID, other.getReplicaSetID());
                     throw new IllegalStateException("Node exists in ReplicaSet!");
                 }
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java
index 2c4fce2..5177f78 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java
@@ -18,17 +18,65 @@
 
 package org.apache.kylin.stream.coordinator.doctor;
 
+import org.apache.hadoop.hbase.util.Threads;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 /**
- * <pre>
- * Basic step of this class:
- *  1. stop coordinator to avoid underlying concurrency issue
- *  2. check inconsistent state of all receiver cluster
- *  3. send summary via mail to kylin admin
- *  4. if need, call ClusterDoctor to repair inconsistent issue
- * </pre>
+ *
+ * <h2>Something need to check</h2>
+ * <dl>
+ *     <dt>Zookeeper Avaliable</dt>
+ *     <dd>Check Avaliable</dd>
+ *
+ *     <dt>Receiver Avaliable</dt>
+ *     <dd>Check Avaliable</dd>
+ *
+ *     <dt>Metadata RW failure</dt>
+ *     <dd>Coordinator write consistent state into metadata, statistics of r/w failure
+ *      is important of cluster health</dd>
+ *
+ *     <dt>RPC failure</dt>
+ *     <dd>Coordinator send request to streaming receiver, statistics
+ *      of failure is important of cluster health</dd>
+ *
+ *     <dt>Segment Build Job & Promotion Failure</dt>
+ *
+ *
+ *     <dt>Cube Assignment Consistency</dt>
+ *     <dd>If receiver's behvaior is not aligned with central metdadata, it indicated there must be something wrong.</dd>
+ *
+ *     <dt>Active segments Count & Immutable segments Count</dt>
+ *     <dd>If receivers have too many active segments, it indicated that promotion is blocked,
+ *      for each query it received, it has to scan too much segment/fragment file,
+ *      so performance will be impacted badly.  </dd>
+ *
+ *     <dt>Consume lag</dt>
+ *     <dd>If receivers cannot catch the rate by producer, much active will be accumulated,
+ *      and performance will be impacted badly.  </dd>
+ *
+ * </dl>
+ *
+ * <h2>Check and Report</h2>
+ * Basic step:
+ * <ol>
+ *  <li> stop coordinator to avoid underlying concurrency issue </li>
+ *  <li> check inconsistent state of all receiver cluster </li>
+ *  <li> send summary via mail to kylin admin </li>
+ *  <li> if need, call ClusterDoctor to repair inconsistent issue </li>
+ * </ol>
+ *
  * @see org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicAndNotIdempotent
  * @see ClusterDoctor
  */
 public class ClusterStateChecker {
-    // TO BE IMPLEMENTED
+
+    ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 10, 20, TimeUnit.MINUTES,
+            new LinkedBlockingQueue<Runnable>(10 * 100), //
+            Threads.newDaemonThreadFactory("Cluster-checker-"));
+
+
+
+
 }
diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
index 044534c..fa9d19e 100644
--- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
+++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.stream.coordinator.exception.StoreException;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -90,12 +91,12 @@ public class BuildJobSubmitterTest extends StreamingTestBase {
         assertEquals(1, buildJobSubmitter.getCubeCheckList().size());
     }
 
-    @Test
+    @Test(expected = StoreException.class)
     @SuppressWarnings("unchecked")
-    public void testTraceEarliestSegmentBuildJob2() throws IOException {
+    public void testTraceEarliestSegmentBuildJob2() {
         beforeTestTraceEarliestSegmentBuildJob();
         when(clusterManager.segmentBuildComplete(isA(CubingJob.class), isA(CubeInstance.class), isA(CubeSegment.class),
-                isA(SegmentJobBuildInfo.class))).thenThrow(IOException.class);
+                isA(SegmentJobBuildInfo.class))).thenThrow(StoreException.class);
         BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator);
         buildJobSubmitter.restore();
         List<SegmentJobBuildInfo> jobList = buildJobSubmitter.traceEarliestSegmentBuildJob();
diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
index c7b1ac5..408173a 100644
--- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
+++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
@@ -262,7 +262,6 @@ public class StreamingTestBase extends LocalFileMetadataTestCase {
         ReceiverClusterManager clusterManager = mock(ReceiverClusterManager.class);
         when(clusterManager.getCoordinator()).thenReturn(coordinator);
         return clusterManager;
-        // return new ReceiverClusterManager(coordinator);
     }
 
     KylinConfig stubKylinConfig() {
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java b/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java
index b040691..9007d35 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java
@@ -35,7 +35,6 @@ import org.apache.kylin.stream.core.model.stats.ReceiverStats;
 /**
  * StreamingCoordinator send admin request to speicifc receiver
  * (received by org.apache.kylin.stream.server.rest.controller.AdminController).
- *
  */
 public interface ReceiverAdminClient {
 
@@ -103,6 +102,10 @@ public interface ReceiverAdminClient {
 
     /**
      * Ask receiver to stop consumption and convert all segments to Immutable.
+     *
+     * If a replica set is removed from consumption task, coordinator will notify
+     *    its receivers and ask them to upload all data asap.
+     * Often happend in reassign action.
      */
     void makeCubeImmutable(Node receiver, String cubeName) throws IOException;
 
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
index 256b519..f5d70e3 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
@@ -80,6 +80,7 @@ public class StreamingConsumerChannel implements Runnable {
 
     public void start() {
         this.consumerThread = new Thread(this, cubeName + "_channel");
+        consumerThread.setPriority(Thread.MAX_PRIORITY); // Improve the priority of consumer thread to make ingest rate stable
         connector.open();
         consumerThread.start();
     }
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java
index 10e7b82..52658d8 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java
@@ -28,20 +28,33 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker;
 import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker.CheckResult;
 import org.apache.kylin.dimension.TimeDerivedColumnType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TimeZone;
 
 /**
  * Scan planner for Streaming data segments, take derived time columns into consideration.
  */
 public class StreamingDataQueryPlanner {
 
+    private static Logger logger = LoggerFactory.getLogger(StreamingDataQueryPlanner.class);
+
     protected CubeDesc cubeDesc;
     protected TupleFilter filter;
     protected TupleFilter flatFilter;
+    private final long timezoneOffset;
 
     public StreamingDataQueryPlanner(CubeDesc cubeDesc, TupleFilter filter) {
         this.cubeDesc = cubeDesc;
         this.filter = filter;
         this.flatFilter = flattenToOrAndFilter(filter);
+        String timezoneName = cubeDesc.getConfig().getStreamingDerivedTimeTimezone();
+        if (timezoneName == null || timezoneName.length() == 0) {
+            timezoneOffset = 0;
+        } else {
+            timezoneOffset = TimeZone.getTimeZone(timezoneName).getRawOffset();
+        }
     }
 
     public boolean canSkip(long timeStart, long timeEnd) {
@@ -81,7 +94,7 @@ public class StreamingDataQueryPlanner {
             TimeDerivedColumnType timeDerivedColumnType = TimeDerivedColumnType.getTimeDerivedColumnType(column
                     .getName());
 
-            CheckResult checkResult = timeRangeChecker.check(comp, timeDerivedColumnType);
+            CheckResult checkResult = timeRangeChecker.check(comp, timeDerivedColumnType, timezoneOffset);
             if (checkResult == CheckResult.EXCLUDED) {
                 return true;
             } else {
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
index 110cc8e..87e4c5f 100755
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
@@ -55,8 +55,8 @@ public class StreamingTupleConverter {
 
     final List<MeasureType.IAdvMeasureFiller> advMeasureFillers;
     final List<Integer> advMeasureIndexInGTValues;
-    final boolean autoTimezone = KylinConfig.getInstanceFromEnv().isStreamingAutoJustTimezone();
-    private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone())
+    final boolean autoTimezone = KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone().length() > 0;
+    private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone())
             .getRawOffset();
 
     public StreamingTupleConverter(ResponseResultSchema schema, TupleInfo returnTupleInfo) {
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/IStreamingMessageParser.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/IStreamingMessageParser.java
index 3e27d89..5afef78 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/IStreamingMessageParser.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/IStreamingMessageParser.java
@@ -20,6 +20,9 @@ package org.apache.kylin.stream.core.source;
 
 import org.apache.kylin.stream.core.model.StreamingMessage;
 
+/**
+ * ExtensionPoint for parse message
+ */
 public interface IStreamingMessageParser<T> {
     StreamingMessage parse(T sourceMessage);
 }
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java
index 9b0a912..c2f7641 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Dictionary;
@@ -79,6 +80,11 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
 
     @Override
     public void search(StreamingSearchContext searchContext, ResultCollector collector) throws IOException {
+        String timezone = searchContext.getCubeDesc().getConfig().getStreamingDerivedTimeTimezone();
+        long timezoneOffset = 0;
+        if (timezone != null && timezone.length() > 0) {
+            timezoneOffset = TimeZone.getTimeZone(timezone).getRawOffset();
+        }
         FragmentMetaInfo fragmentMetaInfo = fragmentData.getFragmentMetaInfo();
         CuboidMetaInfo cuboidMetaInfo;
         if (searchContext.hitBasicCuboid()) {
@@ -116,8 +122,8 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
         Set<TblColRef> unEvaluateDims = Sets.newHashSet();
         TupleFilter fragmentFilter = null;
         if (searchContext.getFilter() != null) {
-            fragmentFilter = convertFilter(fragmentMetaInfo, searchContext.getFilter(), recordCodec,
-                    dimensions, new CubeDimEncMap(cubeDesc, dictMap), unEvaluateDims);
+            fragmentFilter = convertFilter(fragmentMetaInfo, searchContext.getFilter(), recordCodec, dimensions,
+                    new CubeDimEncMap(cubeDesc, dictMap), unEvaluateDims, timezoneOffset);
         }
         if (ConstantTupleFilter.TRUE == fragmentFilter) {
             fragmentFilter = null;
@@ -134,8 +140,8 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
     }
 
     private TupleFilter convertFilter(FragmentMetaInfo fragmentMetaInfo, TupleFilter rootFilter,
-                                      ColumnarRecordCodec recordCodec, final TblColRef[] dimensions, final IDimensionEncodingMap dimEncodingMap, //
-                                      final Set<TblColRef> unEvaluableColumnCollector) {
+            ColumnarRecordCodec recordCodec, final TblColRef[] dimensions, final IDimensionEncodingMap dimEncodingMap, //
+            final Set<TblColRef> unEvaluableColumnCollector, long timezoneOffset) {
         Map<TblColRef, Integer> colMapping = Maps.newHashMap();
         for (int i = 0; i < dimensions.length; i++) {
             colMapping.put(dimensions[i], i);
@@ -147,6 +153,7 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
         filter = builtInFunctionTransformer.transform(filter);
         FragmentFilterConverter fragmentFilterConverter = new FragmentFilterConverter(fragmentMetaInfo, unEvaluableColumnCollector,
                 colMapping, recordCodec);
+        fragmentFilterConverter.setTimezoneOffset(timezoneOffset);
         filter = fragmentFilterConverter.transform(filter);
 
         filter = new FilterOptimizeTransformer().transform(filter);
@@ -159,6 +166,7 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
         private CompareFilterTimeRangeChecker filterTimeRangeChecker;
         private ColumnarRecordCodec recordCodec;
         transient ByteBuffer buf;
+        private long timezoneOffset = 0;
 
         public FragmentFilterConverter(FragmentMetaInfo fragmentMetaInfo, Set<TblColRef> unEvaluableColumnCollector,
                                        Map<TblColRef, Integer> colMapping, ColumnarRecordCodec recordCodec) {
@@ -225,7 +233,7 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
 
             if (TimeDerivedColumnType.isTimeDerivedColumn(externalCol.getName()) && filterTimeRangeChecker != null) {
                 CheckResult checkResult = filterTimeRangeChecker.check(oldCompareFilter,
-                        TimeDerivedColumnType.getTimeDerivedColumnType(externalCol.getName()));
+                        TimeDerivedColumnType.getTimeDerivedColumnType(externalCol.getName()), timezoneOffset);
                 if (checkResult == CheckResult.INCLUDED) {
                     return ConstantTupleFilter.TRUE;
                 } else if (checkResult == CheckResult.EXCLUDED) {
@@ -354,7 +362,8 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
             }
         }
 
-
+        public void setTimezoneOffset(long timezoneOffset) {
+            this.timezoneOffset = timezoneOffset;
+        }
     }
-
 }
\ No newline at end of file
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java b/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java
index c01459d..52472b2 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java
@@ -57,10 +57,17 @@ public class CompareFilterTimeRangeChecker {
         this.endClose = endClose;
     }
 
-    public CheckResult check(CompareTupleFilter compFilter, TimeDerivedColumnType timeDerivedColumnType) {
+    public CheckResult check(CompareTupleFilter compFilter, TimeDerivedColumnType timeDerivedColumnType, long timezoneOffset) {
         Object timestampValue = compFilter.getFirstValue();
         Set conditionValues = compFilter.getValues();
-        Pair<Long, Long> timeUnitRange = timeDerivedColumnType.getTimeUnitRange(timestampValue);
+        Pair<Long, Long> timeUnitRange;
+        if (timeDerivedColumnType != TimeDerivedColumnType.MINUTE_START
+                && timeDerivedColumnType != TimeDerivedColumnType.HOUR_START) {
+            timeUnitRange = timezoneOffset == 0 ? timeDerivedColumnType.getTimeUnitRange(timestampValue)
+                    : timeDerivedColumnType.getTimeUnitRangeTimezoneAware(timestampValue, timezoneOffset);
+        } else {
+            timeUnitRange = timeDerivedColumnType.getTimeUnitRange(timestampValue);
+        }
         switch (compFilter.getOperator()) {
         case EQ:
             return checkForEqValue(timeUnitRange);
@@ -105,7 +112,7 @@ public class CompareFilterTimeRangeChecker {
             }
             return CheckResult.OVERLAP;
         case IN:
-            return checkForInValues(timeDerivedColumnType, conditionValues);
+            return checkForInValues(timeDerivedColumnType, conditionValues, timezoneOffset);
         default:
             return CheckResult.OVERLAP;
         }
@@ -121,10 +128,10 @@ public class CompareFilterTimeRangeChecker {
         return CheckResult.OVERLAP;
     }
 
-    private CheckResult checkForInValues(TimeDerivedColumnType timeDerivedColumnType, Collection<Object> values) {
+    private CheckResult checkForInValues(TimeDerivedColumnType timeDerivedColumnType, Collection<Object> values, long timezoneOffset) {
         CheckResult result = null;
         for (Object timestampValue : values) {
-            Pair<Long, Long> timeUnitRange = timeDerivedColumnType.getTimeUnitRange(timestampValue);
+            Pair<Long, Long> timeUnitRange = timeDerivedColumnType.getTimeUnitRangeTimezoneAware(timestampValue, timezoneOffset);
             CheckResult checkResult = checkForEqValue(timeUnitRange);
             if (result == null) {
                 result = checkResult;
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
index 4a504aa..cb1bd6b 100644
--- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.TimeZone;
 import java.util.TreeMap;
 
 import org.apache.commons.lang3.StringUtils;
@@ -65,6 +66,7 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
     private String tsColName = "timestamp";
     private String tsParser = null;
     private AbstractTimeParser streamTimeParser;
+    private long timeZoneOffset = 0;
 
     /**
      * the path of {"user" : {"name": "kite", "sex":"female"}}
@@ -76,6 +78,9 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
 
     public TimedJsonStreamParser(CubeDesc cubeDesc, MessageParserInfo parserInfo) {
         this(new CubeJoinedFlatTableDesc(cubeDesc).getAllColumns(), parserInfo);
+        String timeZone = cubeDesc.getConfig().getStreamingDerivedTimeTimezone();
+        if(timeZone.length() > 0)
+            timeZoneOffset = TimeZone.getTimeZone(timeZone).getRawOffset();
     }
 
     public TimedJsonStreamParser(List<TblColRef> cols, MessageParserInfo parserInfo) {
@@ -144,7 +149,11 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
                 String columnName = column.getName();
                 TimeDerivedColumnType columnType = TimeDerivedColumnType.getTimeDerivedColumnType(columnName);
                 if (columnType != null) {
-                    result.add(String.valueOf(columnType.normalize(t)));
+                    if (timeZoneOffset > 0 && TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(columnName)) {
+                        result.add(String.valueOf(columnType.normalize(t + timeZoneOffset)));
+                    } else {
+                        result.add(String.valueOf(columnType.normalize(t)));
+                    }
                 } else {
                     Object value = root.get(columnName.toLowerCase(Locale.ROOT));
                     if (value == null) {