You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/10/27 14:05:52 UTC

[2/6] kylin git commit: APACHE-KYLIN-2723: refine query & job metrics

APACHE-KYLIN-2723: refine query & job metrics

Signed-off-by: Zhong <nj...@apache.org>
Signed-off-by: lidongsjtu <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ec897aae
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ec897aae
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ec897aae

Branch: refs/heads/master
Commit: ec897aaeff90708b7f87b180a807fe766faf05c2
Parents: ff0d0ed
Author: Wang Ken <mi...@ebay.com>
Authored: Thu Aug 10 19:33:07 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Fri Oct 27 21:58:08 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   8 +
 .../org/apache/kylin/common/QueryContext.java   | 151 +++++++++++++++---
 .../org/apache/kylin/common/util/JsonUtil.java  |  11 +-
 core-job/pom.xml                                |   4 +
 .../kylin/job/metrics/JobMetricsFacade.java     | 142 +++++++++++++++++
 .../apache/kylin/metrics/MetricsManager.java    |  40 +++--
 .../job/ExceptionRecordEventWrapper.java        |  40 -----
 .../kylin/metrics/job/JobPropertyEnum.java      |  56 -------
 .../metrics/job/JobRecordEventWrapper.java      |  68 --------
 .../metrics/lib/impl/RecordEventWrapper.java    |  61 -------
 .../metrics/lib/impl/TimedRecordEvent.java      |  47 ++++++
 .../kylin/metrics/property/JobPropertyEnum.java |  56 +++++++
 .../metrics/property/QueryCubePropertyEnum.java |  54 +++++++
 .../metrics/property/QueryPropertyEnum.java     |  52 ++++++
 .../metrics/property/QueryRPCPropertyEnum.java  |  51 ++++++
 .../query/CubeSegmentRecordEventWrapper.java    |  94 -----------
 .../metrics/query/QueryRecordEventWrapper.java  |  91 -----------
 .../metrics/query/RPCRecordEventWrapper.java    |  79 ---------
 .../org/apache/kylin/engine/mr/CubingJob.java   |  50 +++---
 .../kylin/rest/metrics/QueryMetricsFacade.java  | 159 ++++++++++++++-----
 .../apache/kylin/rest/request/SQLRequest.java   |   9 +-
 .../apache/kylin/rest/response/SQLResponse.java |  28 +++-
 .../apache/kylin/rest/service/QueryService.java |   3 +-
 server/src/main/resources/kylinMetrics.xml      |  69 ++++----
 .../kylin/rest/metrics/QueryMetricsTest.java    |  42 +++++
 25 files changed, 824 insertions(+), 641 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3a9a05e..625bbb3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1332,6 +1332,14 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.core.metrics.monitor-enabled", "false"));
     }
 
+    public boolean isKylinMetricsReporterForQueryEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.core.metrics.reporter-query-enabled", "false"));
+    }
+
+    public boolean isKylinMetricsReporterForJobEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.core.metrics.reporter-job-enabled", "true"));
+    }
+
     public String getKylinMetricsActiveReservoirDefaultClass() {
         return getOptional("kylin.core.metrics.active-reservoir-default-class",
                 "org.apache.kylin.metrics.lib.impl.StubReservoir");

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 0e9add8..5750e03 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -116,9 +116,12 @@ public class QueryContext {
         cubeSegmentStatisticsResult.setRealizationType(realizationType);
     }
 
-    public QueryStatisticsResult getQueryStatisticsResult() {
-        return new QueryStatisticsResult(rpcStatisticsList,
-                Lists.newArrayList(cubeSegmentStatisticsResultMap.values()));
+    public List<RPCStatistics> getRpcStatisticsList() {
+        return rpcStatisticsList;
+    }
+
+    public List<CubeSegmentStatisticsResult> getCubeSegmentStatisticsResultList() {
+        return Lists.newArrayList(cubeSegmentStatisticsResultMap.values());
     }
 
     public void addRPCStatistics(int ctxId, String rpcServer, String cubeName, String segmentName, long sourceCuboidId,
@@ -175,6 +178,8 @@ public class QueryContext {
     }
 
     public static class RPCStatistics implements Serializable {
+        protected static final long serialVersionUID = 1L;
+
         private String realizationName;
         private String rpcServer;
 
@@ -228,6 +233,38 @@ public class QueryContext {
             return skippedRows;
         }
 
+        public void setRealizationName(String realizationName) {
+            this.realizationName = realizationName;
+        }
+
+        public void setRpcServer(String rpcServer) {
+            this.rpcServer = rpcServer;
+        }
+
+        public void setCallTimeMs(long callTimeMs) {
+            this.callTimeMs = callTimeMs;
+        }
+
+        public void setSkippedRows(long skippedRows) {
+            this.skippedRows = skippedRows;
+        }
+
+        public void setScannedRows(long scannedRows) {
+            this.scannedRows = scannedRows;
+        }
+
+        public void setReturnedRows(long returnedRows) {
+            this.returnedRows = returnedRows;
+        }
+
+        public void setAggregatedRows(long aggregatedRows) {
+            this.aggregatedRows = aggregatedRows;
+        }
+
+        public void setScannedBytes(long scannedBytes) {
+            this.scannedBytes = scannedBytes;
+        }
+
         public long getScannedRows() {
             return scannedRows;
         }
@@ -243,9 +280,16 @@ public class QueryContext {
         public long getScannedBytes() {
             return scannedBytes;
         }
+
+        @Override
+        public String toString() {
+            return "RPCStatistics [rpcServer=" + rpcServer + ",realizationName=" + realizationName + "]";
+        }
     }
 
     public static class CubeSegmentStatistics implements Serializable {
+        protected static final long serialVersionUID = 1L;
+
         private String cubeName;
         private String segmentName;
         private long sourceCuboidId;
@@ -289,6 +333,62 @@ public class QueryContext {
             this.storageScannedBytes += scanBytes;
         }
 
+        public void setCubeName(String cubeName) {
+            this.cubeName = cubeName;
+        }
+
+        public void setSegmentName(String segmentName) {
+            this.segmentName = segmentName;
+        }
+
+        public void setSourceCuboidId(long sourceCuboidId) {
+            this.sourceCuboidId = sourceCuboidId;
+        }
+
+        public void setTargetCuboidId(long targetCuboidId) {
+            this.targetCuboidId = targetCuboidId;
+        }
+
+        public void setFilterMask(long filterMask) {
+            this.filterMask = filterMask;
+        }
+
+        public void setIfSuccess(boolean ifSuccess) {
+            this.ifSuccess = ifSuccess;
+        }
+
+        public void setCallCount(long callCount) {
+            this.callCount = callCount;
+        }
+
+        public void setCallTimeSum(long callTimeSum) {
+            this.callTimeSum = callTimeSum;
+        }
+
+        public void setCallTimeMax(long callTimeMax) {
+            this.callTimeMax = callTimeMax;
+        }
+
+        public void setStorageSkippedRows(long storageSkippedRows) {
+            this.storageSkippedRows = storageSkippedRows;
+        }
+
+        public void setStorageScannedRows(long storageScannedRows) {
+            this.storageScannedRows = storageScannedRows;
+        }
+
+        public void setStorageReturnedRows(long storageReturnedRows) {
+            this.storageReturnedRows = storageReturnedRows;
+        }
+
+        public void setStorageAggregatedRows(long storageAggregatedRows) {
+            this.storageAggregatedRows = storageAggregatedRows;
+        }
+
+        public void setStorageScannedBytes(long storageScannedBytes) {
+            this.storageScannedBytes = storageScannedBytes;
+        }
+
         public String getCubeName() {
             return cubeName;
         }
@@ -344,14 +444,25 @@ public class QueryContext {
         public String getSegmentName() {
             return segmentName;
         }
+
+        @Override
+        public String toString() {
+            return "CubeSegmentStatistics [cubeName=" + cubeName + ",segmentName=" + segmentName + ",sourceCuboidId="
+                    + sourceCuboidId + ",targetCuboidId=" + targetCuboidId + ",filterMask=" + filterMask + "]";
+        }
     }
 
     public static class CubeSegmentStatisticsResult implements Serializable {
-        private final String queryType;
-        private final Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap;
+        protected static final long serialVersionUID = 1L;
+
+        private String queryType;
+        private Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap;
         private String realization;
         private int realizationType;
 
+        public CubeSegmentStatisticsResult() {
+        }
+
         public CubeSegmentStatisticsResult(String queryType,
                 Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) {
             this.queryType = queryType;
@@ -374,31 +485,29 @@ public class QueryContext {
             this.realizationType = realizationType;
         }
 
-        public String getQueryType() {
-            return queryType;
+        public void setQueryType(String queryType) {
+            this.queryType = queryType;
         }
 
-        public Map<String, Map<String, CubeSegmentStatistics>> getCubeSegmentStatisticsMap() {
-            return cubeSegmentStatisticsMap;
+        public void setCubeSegmentStatisticsMap(
+                Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) {
+            this.cubeSegmentStatisticsMap = cubeSegmentStatisticsMap;
         }
-    }
 
-    public static class QueryStatisticsResult implements Serializable {
-        private final List<RPCStatistics> rpcStatisticsList;
-        private final List<CubeSegmentStatisticsResult> cubeSegmentStatisticsResultList;
+        public String getQueryType() {
+            return queryType;
 
-        public QueryStatisticsResult(List<RPCStatistics> rpcStatisticsList,
-                List<CubeSegmentStatisticsResult> cubeSegmentStatisticsResultList) {
-            this.rpcStatisticsList = rpcStatisticsList;
-            this.cubeSegmentStatisticsResultList = cubeSegmentStatisticsResultList;
         }
 
-        public List<RPCStatistics> getRpcStatisticsList() {
-            return rpcStatisticsList;
+        public Map<String, Map<String, CubeSegmentStatistics>> getCubeSegmentStatisticsMap() {
+            return cubeSegmentStatisticsMap;
         }
 
-        public List<CubeSegmentStatisticsResult> getCubeSegmentStatisticsResultList() {
-            return cubeSegmentStatisticsResultList;
+        @Override
+        public String toString() {
+            return "CubeSegmentStatisticsResult [queryType=" + queryType + ",realization=" + realization
+                    + ",realizationType=" + realizationType + ",cubeSegmentStatisticsMap=" + cubeSegmentStatisticsMap
+                    + "]";
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
index 01fd134..21bc8f1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/JsonUtil.java
@@ -41,10 +41,12 @@ public class JsonUtil {
     // reuse the object mapper to save memory footprint
     private static final ObjectMapper mapper = new ObjectMapper();
     private static final ObjectMapper indentMapper = new ObjectMapper();
+    private static final ObjectMapper typeMapper = new ObjectMapper();
 
     static {
         mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
         indentMapper.configure(SerializationFeature.INDENT_OUTPUT, true);
+        typeMapper.enableDefaultTyping();
     }
 
     public static <T> T readValue(File src, Class<T> valueType)
@@ -87,6 +89,10 @@ public class JsonUtil {
         return mapper.readTree(content);
     }
 
+    public static <T> T readValueWithTyping(InputStream src, Class<T> valueType) throws IOException {
+        return typeMapper.readValue(src, valueType);
+    }
+
     public static void writeValueIndent(OutputStream out, Object value)
             throws IOException, JsonGenerationException, JsonMappingException {
         indentMapper.writeValue(out, value);
@@ -109,4 +115,7 @@ public class JsonUtil {
         return indentMapper.writeValueAsString(value);
     }
 
-}
+    public static void writeValueWithTyping(OutputStream out, Object value) throws IOException {
+        typeMapper.writeValue(out, value);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-job/pom.xml
----------------------------------------------------------------------
diff --git a/core-job/pom.xml b/core-job/pom.xml
index 1f433bf..c3b912b 100644
--- a/core-job/pom.xml
+++ b/core-job/pom.xml
@@ -38,6 +38,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-cube</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+        </dependency>
 
         <!-- Provided -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
new file mode 100644
index 0000000..904c4bd
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.metrics;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.MetricsManager;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
+import org.apache.kylin.metrics.property.JobPropertyEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobMetricsFacade {
+    private static final Logger logger = LoggerFactory.getLogger(JobMetricsFacade.class);
+
+    public static void updateMetrics(JobStatisticsResult jobStats) {
+        if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForJobEnabled()) {
+            return;
+        }
+        /**
+         * report job related metrics
+         */
+        RecordEvent metricsEvent;
+        if (jobStats.throwable == null) {
+            metricsEvent = new TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob());
+            setJobWrapper(metricsEvent, jobStats.user, jobStats.projectName, jobStats.cubeName, jobStats.jobId,
+                    jobStats.jobType, jobStats.cubingType);
+            setJobStats(metricsEvent, jobStats.tableSize, jobStats.cubeSize, jobStats.buildDuration,
+                    jobStats.waitResourceTime, jobStats.perBytesTimeCost, //
+                    jobStats.dColumnDistinct, jobStats.dDictBuilding, jobStats.dCubingInmem, jobStats.dHfileConvert);
+        } else {
+            metricsEvent = new TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException());
+            setJobExceptionWrapper(metricsEvent, jobStats.user, jobStats.projectName, jobStats.cubeName, jobStats.jobId,
+                    jobStats.jobType, jobStats.cubingType, //
+                    jobStats.throwable.getClass());
+        }
+        MetricsManager.getInstance().update(metricsEvent);
+    }
+
+    private static void setJobWrapper(RecordEvent metricsEvent, String user, String projectName, String cubeName,
+            String jobId, String jobType, String cubingType) {
+        metricsEvent.put(JobPropertyEnum.USER.toString(), user);
+        metricsEvent.put(JobPropertyEnum.PROJECT.toString(), projectName);
+        metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName);
+        metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), jobId);
+        metricsEvent.put(JobPropertyEnum.TYPE.toString(), jobType);
+        metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), cubingType);
+    }
+
+    private static void setJobStats(RecordEvent metricsEvent, long tableSize, long cubeSize, long buildDuration,
+            long waitResourceTime, double perBytesTimeCost, long dColumnDistinct, long dDictBuilding, long dCubingInmem,
+            long dHfileConvert) {
+        metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), tableSize);
+        metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), cubeSize);
+        metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), buildDuration);
+        metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), waitResourceTime);
+        metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), perBytesTimeCost);
+        metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(), dColumnDistinct);
+        metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), dDictBuilding);
+        metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), dCubingInmem);
+        metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), dHfileConvert);
+    }
+
+    private static <T extends Throwable> void setJobExceptionWrapper(RecordEvent metricsEvent, String user,
+            String projectName, String cubeName, String jobId, String jobType, String cubingType,
+            Class<T> throwableClass) {
+        setJobWrapper(metricsEvent, user, projectName, cubeName, jobId, jobType, cubingType);
+        metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), throwableClass.getName());
+    }
+
+    public static class JobStatisticsResult {
+        // dimensions
+        private String user;
+        private String projectName;
+        private String cubeName;
+        private String jobId;
+        private String jobType;
+        private String cubingType;
+
+        // statistics
+        private long tableSize;
+        private long cubeSize;
+        private long buildDuration;
+        private long waitResourceTime;
+        private double perBytesTimeCost;
+
+        // step statistics
+        private long dColumnDistinct = 0L;
+        private long dDictBuilding = 0L;
+        private long dCubingInmem = 0L;
+        private long dHfileConvert = 0L;
+
+        // exception
+        private Throwable throwable;
+
+        public void setWrapper(String user, String projectName, String cubeName, String jobId, String jobType,
+                String cubingType) {
+            this.user = user;
+            this.projectName = projectName;
+            this.cubeName = cubeName;
+            this.jobId = jobId;
+            this.jobType = jobType;
+            this.cubingType = cubingType;
+        }
+
+        public void setJobStats(long tableSize, long cubeSize, long buildDuration, long waitResourceTime,
+                double perBytesTimeCost) {
+            this.tableSize = tableSize;
+            this.cubeSize = cubeSize;
+            this.buildDuration = buildDuration;
+            this.waitResourceTime = waitResourceTime;
+            this.perBytesTimeCost = perBytesTimeCost;
+        }
+
+        public void setJobStepStats(long dColumnDistinct, long dDictBuilding, long dCubingInmem, long dHfileConvert) {
+            this.dColumnDistinct = dColumnDistinct;
+            this.dDictBuilding = dDictBuilding;
+            this.dCubingInmem = dCubingInmem;
+            this.dHfileConvert = dHfileConvert;
+        }
+
+        public void setJobException(Throwable throwable) {
+            this.throwable = throwable;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
index 2616c38..ce28bf6 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
@@ -62,11 +62,30 @@ public class MetricsManager {
         return instance;
     }
 
-    public static void setSystemCubeSink(Sink systemCubeSink) {
+    public static void initMetricsManager(Sink systemCubeSink,
+            Map<ActiveReservoir, List<Pair<String, Properties>>> sourceReporterBindProperties) {
+        setSystemCubeSink(systemCubeSink);
+        setSourceReporterBindProps(sourceReporterBindProperties);
+        instance.init();
+    }
+
+    private static void setSystemCubeSink(Sink systemCubeSink) {
+        if (systemCubeSink == null) {
+            logger.warn("SystemCubeSink is not set and the default one will be chosen");
+            try {
+                Class clz = Class.forName(KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass());
+                systemCubeSink = (Sink) clz.getConstructor().newInstance();
+            } catch (Exception e) {
+                logger.warn("Failed to initialize the "
+                        + KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass()
+                        + ". The StubSink will be used");
+                systemCubeSink = new StubSink();
+            }
+        }
         scSink = systemCubeSink;
     }
 
-    public static void setSourceReporterBindProps(
+    private static void setSourceReporterBindProps(
             Map<ActiveReservoir, List<Pair<String, Properties>>> sourceReporterBindProperties) {
         sourceReporterBindProps = Maps.newHashMapWithExpectedSize(sourceReporterBindProperties.size());
         for (ActiveReservoir activeReservoir : sourceReporterBindProperties.keySet()) {
@@ -88,20 +107,7 @@ public class MetricsManager {
         }
     }
 
-    public void init() {
-        if (scSink == null) {
-            logger.warn("SystemCubeSink is not set and the default one will be chosen");
-            try {
-                Class clz = Class.forName(KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass());
-                scSink = (Sink) clz.getConstructor().newInstance();
-            } catch (Exception e) {
-                logger.warn(
-                        "Failed to initialize the " + KylinConfig.getInstanceFromEnv().getKylinSystemCubeSinkDefaultClass()
-                                + ". The StubSink will be used");
-                scSink = new StubSink();
-            }
-        }
-
+    private void init() {
         if (KylinConfig.getInstanceFromEnv().isKylinMetricsMonitorEnabled()) {
             logger.info("Kylin metrics monitor is enabled.");
             int nameIdx = 0;
@@ -136,7 +142,7 @@ public class MetricsManager {
         }
     }
 
-    public String getSystemTableFromSubject(String subject) {
+    public static String getSystemTableFromSubject(String subject) {
         return scSink.getTableFromSubject(subject);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/job/ExceptionRecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/job/ExceptionRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/job/ExceptionRecordEventWrapper.java
deleted file mode 100644
index 8d56025..0000000
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/job/ExceptionRecordEventWrapper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.job;
-
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
-
-public class ExceptionRecordEventWrapper extends RecordEventWrapper {
-
-    public ExceptionRecordEventWrapper(RecordEvent metricsEvent) {
-        super(metricsEvent);
-    }
-
-    public <T extends Throwable> void setWrapper(String projectName, String cubeName, String jobId, String jobType,
-            String cubingType, Class<T> exceptionClassName) {
-        this.metricsEvent.put(JobPropertyEnum.PROJECT.toString(), projectName);
-        this.metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName);
-        this.metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), jobId);
-        this.metricsEvent.put(JobPropertyEnum.TYPE.toString(), jobType);
-        this.metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), cubingType);
-        this.metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), exceptionClassName.getName());
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobPropertyEnum.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobPropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobPropertyEnum.java
deleted file mode 100644
index be32424..0000000
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobPropertyEnum.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.job;
-
-import com.google.common.base.Strings;
-
-public enum JobPropertyEnum {
-    ID_CODE("JOB_ID"), PROJECT("PROJECT"), CUBE("CUBE_NAME"), TYPE("JOB_TYPE"), ALGORITHM("CUBING_TYPE"), STATUS(
-            "JOB_STATUS"), EXCEPTION("EXCEPTION"), //
-    SOURCE_SIZE("TABLE_SIZE"), CUBE_SIZE("CUBE_SIZE"), BUILD_DURATION("DURATION"), WAIT_RESOURCE_TIME(
-            "WAIT_RESOURCE_TIME"), PER_BYTES_TIME_COST("PER_BYTES_TIME_COST"), STEP_DURATION_DISTINCT_COLUMNS(
-                    "STEP_DURATION_DISTINCT_COLUMNS"), STEP_DURATION_DICTIONARY(
-                            "STEP_DURATION_DICTIONARY"), STEP_DURATION_INMEM_CUBING(
-                                    "STEP_DURATION_INMEM_CUBING"), STEP_DURATION_HFILE_CONVERT(
-                                            "STEP_DURATION_HFILE_CONVERT");
-
-    private final String propertyName;
-
-    JobPropertyEnum(String name) {
-        this.propertyName = name;
-    }
-
-    public static JobPropertyEnum getByName(String name) {
-        if (Strings.isNullOrEmpty(name)) {
-            return null;
-        }
-        for (JobPropertyEnum property : JobPropertyEnum.values()) {
-            if (property.propertyName.equals(name.toUpperCase())) {
-                return property;
-            }
-        }
-
-        return null;
-    }
-
-    @Override
-    public String toString() {
-        return propertyName;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
deleted file mode 100644
index 6cd197e..0000000
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.job;
-
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
-
-public class JobRecordEventWrapper extends RecordEventWrapper {
-
-    public static final long MIN_SOURCE_SIZE = 33554432L; //32MB per block created by the first step
-
-    public JobRecordEventWrapper(RecordEvent metricsEvent) {
-        super(metricsEvent);
-        initStats();
-    }
-
-    public void initStats() {
-        this.metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), 0L);
-        this.metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), 0L);
-        this.metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), 0L);
-        this.metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), 0L);
-        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(), 0L);
-        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), 0L);
-        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), 0L);
-        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), 0L);
-        this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 0L);
-    }
-
-    public void setWrapper(String projectName, String cubeName, String jobId, String jobType, String cubingType) {
-        this.metricsEvent.put(JobPropertyEnum.PROJECT.toString(), projectName);
-        this.metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName);
-        this.metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), jobId);
-        this.metricsEvent.put(JobPropertyEnum.TYPE.toString(), jobType);
-        this.metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), cubingType);
-    }
-
-    public void setStats(long tableSize, long cubeSize, long buildDuration, long waitResourceTime,
-            double perBytesTimeCost) {
-        this.metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), tableSize);
-        this.metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), cubeSize);
-        this.metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), buildDuration);
-        this.metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), waitResourceTime);
-        this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), perBytesTimeCost);
-    }
-
-    public void setStepStats(long dColumnDistinct, long dDictBuilding, long dCubingInmem, long dHfileConvert) {
-        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(), dColumnDistinct);
-        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), dDictBuilding);
-        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), dCubingInmem);
-        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), dHfileConvert);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
deleted file mode 100644
index 7031129..0000000
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.lib.impl;
-
-import java.io.Serializable;
-
-import org.apache.kylin.metrics.lib.Record;
-
-public class RecordEventWrapper implements Serializable {
-
-    protected final RecordEvent metricsEvent;
-
-    public RecordEventWrapper(RecordEvent metricsEvent) {
-        this.metricsEvent = metricsEvent;
-
-        //Add time details
-        addTimeDetails();
-    }
-
-    private void addTimeDetails() {
-        RecordEventTimeDetail dateDetail = new RecordEventTimeDetail(metricsEvent.getTime());
-        metricsEvent.put(TimePropertyEnum.YEAR.toString(), dateDetail.year_begin_date);
-        metricsEvent.put(TimePropertyEnum.MONTH.toString(), dateDetail.month_begin_date);
-        metricsEvent.put(TimePropertyEnum.WEEK_BEGIN_DATE.toString(), dateDetail.week_begin_date);
-        metricsEvent.put(TimePropertyEnum.DAY_DATE.toString(), dateDetail.date);
-        metricsEvent.put(TimePropertyEnum.DAY_TIME.toString(), dateDetail.time);
-        metricsEvent.put(TimePropertyEnum.TIME_HOUR.toString(), dateDetail.hour);
-        metricsEvent.put(TimePropertyEnum.TIME_MINUTE.toString(), dateDetail.minute);
-        metricsEvent.put(TimePropertyEnum.TIME_SECOND.toString(), dateDetail.second);
-    }
-
-    public void resetTime() {
-        metricsEvent.resetTime();
-        addTimeDetails();
-    }
-
-    public Record getMetricsRecord() {
-        return metricsEvent;
-    }
-
-    @Override
-    public String toString() {
-        return metricsEvent.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
new file mode 100644
index 0000000..a866163
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+public class TimedRecordEvent extends RecordEvent {
+
+    public TimedRecordEvent(String eventType) {
+        super(eventType);
+
+        //Add time details
+        addTimeDetails();
+    }
+
+    private void addTimeDetails() {
+        RecordEventTimeDetail dateDetail = new RecordEventTimeDetail(getTime());
+        put(TimePropertyEnum.YEAR.toString(), dateDetail.year_begin_date);
+        put(TimePropertyEnum.MONTH.toString(), dateDetail.month_begin_date);
+        put(TimePropertyEnum.WEEK_BEGIN_DATE.toString(), dateDetail.week_begin_date);
+        put(TimePropertyEnum.DAY_DATE.toString(), dateDetail.date);
+        put(TimePropertyEnum.DAY_TIME.toString(), dateDetail.time);
+        put(TimePropertyEnum.TIME_HOUR.toString(), dateDetail.hour);
+        put(TimePropertyEnum.TIME_MINUTE.toString(), dateDetail.minute);
+        put(TimePropertyEnum.TIME_SECOND.toString(), dateDetail.second);
+    }
+
+    @Override
+    public void resetTime() {
+        super.resetTime();
+        addTimeDetails();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java
new file mode 100644
index 0000000..bbe987a
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.property;
+
+import com.google.common.base.Strings;
+
+public enum JobPropertyEnum {
+    ID_CODE("JOB_ID"), USER("USER"), PROJECT("PROJECT"), CUBE("CUBE_NAME"), TYPE("JOB_TYPE"), ALGORITHM(
+            "CUBING_TYPE"), STATUS("JOB_STATUS"), EXCEPTION("EXCEPTION"), //
+    SOURCE_SIZE("TABLE_SIZE"), CUBE_SIZE("CUBE_SIZE"), BUILD_DURATION("DURATION"), WAIT_RESOURCE_TIME(
+            "WAIT_RESOURCE_TIME"), PER_BYTES_TIME_COST("PER_BYTES_TIME_COST"), STEP_DURATION_DISTINCT_COLUMNS(
+                    "STEP_DURATION_DISTINCT_COLUMNS"), STEP_DURATION_DICTIONARY(
+                            "STEP_DURATION_DICTIONARY"), STEP_DURATION_INMEM_CUBING(
+                                    "STEP_DURATION_INMEM_CUBING"), STEP_DURATION_HFILE_CONVERT(
+                                            "STEP_DURATION_HFILE_CONVERT");
+
+    private final String propertyName;
+
+    JobPropertyEnum(String name) {
+        this.propertyName = name;
+    }
+
+    public static JobPropertyEnum getByName(String name) {
+        if (Strings.isNullOrEmpty(name)) {
+            return null;
+        }
+        for (JobPropertyEnum property : JobPropertyEnum.values()) {
+            if (property.propertyName.equals(name.toUpperCase())) {
+                return property;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return propertyName;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryCubePropertyEnum.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryCubePropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryCubePropertyEnum.java
new file mode 100644
index 0000000..d2d5bb4
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryCubePropertyEnum.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.property;
+
+import com.google.common.base.Strings;
+
+public enum QueryCubePropertyEnum {
+    PROJECT("PROJECT"), CUBE("CUBE_NAME"), SEGMENT("SEGMENT_NAME"), CUBOID_SOURCE("CUBOID_SOURCE"), CUBOID_TARGET(
+            "CUBOID_TARGET"), IF_MATCH("IF_MATCH"), FILTER_MASK("FILTER_MASK"), IF_SUCCESS("IF_SUCCESS"), //
+    TIME_SUM("STORAGE_CALL_TIME_SUM"), TIME_MAX("STORAGE_CALL_TIME_MAX"), WEIGHT_PER_HIT("WEIGHT_PER_HIT"), CALL_COUNT(
+            "STORAGE_CALL_COUNT"), SKIP_COUNT("STORAGE_COUNT_SKIP"), SCAN_COUNT("STORAGE_COUNT_SCAN"), RETURN_COUNT(
+                    "STORAGE_COUNT_RETURN"), AGGR_FILTER_COUNT(
+                            "STORAGE_COUNT_AGGREGATE_FILTER"), AGGR_COUNT("STORAGE_COUNT_AGGREGATE");
+
+    private final String propertyName;
+
+    QueryCubePropertyEnum(String name) {
+        this.propertyName = name;
+    }
+
+    public static QueryCubePropertyEnum getByName(String name) {
+        if (Strings.isNullOrEmpty(name)) {
+            return null;
+        }
+        for (QueryCubePropertyEnum property : QueryCubePropertyEnum.values()) {
+            if (property.propertyName.equals(name.toUpperCase())) {
+                return property;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return propertyName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java
new file mode 100644
index 0000000..6fe5b0f
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.property;
+
+import com.google.common.base.Strings;
+
+public enum QueryPropertyEnum {
+    ID_CODE("QUERY_HASH_CODE"), TYPE("QUERY_TYPE"), USER("USER"), PROJECT("PROJECT"), REALIZATION(
+            "REALIZATION"), REALIZATION_TYPE("REALIZATION_TYPE"), EXCEPTION("EXCEPTION"), //
+    TIME_COST("QUERY_TIME_COST"), CALCITE_RETURN_COUNT("CALCITE_COUNT_RETURN"), STORAGE_RETURN_COUNT(
+            "STORAGE_COUNT_RETURN"), AGGR_FILTER_COUNT("CALCITE_COUNT_AGGREGATE_FILTER");
+
+    private final String propertyName;
+
+    QueryPropertyEnum(String name) {
+        this.propertyName = name;
+    }
+
+    public static QueryPropertyEnum getByName(String name) {
+        if (Strings.isNullOrEmpty(name)) {
+            return null;
+        }
+        for (QueryPropertyEnum property : QueryPropertyEnum.values()) {
+            if (property.propertyName.equals(name.toUpperCase())) {
+                return property;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return propertyName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryRPCPropertyEnum.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryRPCPropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryRPCPropertyEnum.java
new file mode 100644
index 0000000..049b9ed
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryRPCPropertyEnum.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.property;
+
+import com.google.common.base.Strings;
+
+public enum QueryRPCPropertyEnum {
+    PROJECT("PROJECT"), REALIZATION("REALIZATION"), RPC_SERVER("RPC_SERVER"), EXCEPTION("EXCEPTION"), //
+    CALL_TIME("CALL_TIME"), SKIP_COUNT("COUNT_SKIP"), SCAN_COUNT("COUNT_SCAN"), RETURN_COUNT(
+            "COUNT_RETURN"), AGGR_FILTER_COUNT("COUNT_AGGREGATE_FILTER"), AGGR_COUNT("COUNT_AGGREGATE");
+
+    private final String propertyName;
+
+    QueryRPCPropertyEnum(String name) {
+        this.propertyName = name;
+    }
+
+    public static QueryRPCPropertyEnum getByName(String name) {
+        if (Strings.isNullOrEmpty(name)) {
+            return null;
+        }
+        for (QueryRPCPropertyEnum property : QueryRPCPropertyEnum.values()) {
+            if (property.propertyName.equals(name.toUpperCase())) {
+                return property;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return propertyName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java
deleted file mode 100644
index b5fa218..0000000
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.query;
-
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
-
-import com.google.common.base.Strings;
-
-public class CubeSegmentRecordEventWrapper extends RecordEventWrapper {
-
-    public CubeSegmentRecordEventWrapper(RecordEvent metricsEvent) {
-        super(metricsEvent);
-    }
-
-    public void setWrapper(String projectName, String cubeName, String segmentName, long sourceCuboidId,
-            long targetCuboidId, long filterMask) {
-        this.metricsEvent.put(PropertyEnum.PROJECT.toString(), projectName);
-        this.metricsEvent.put(PropertyEnum.CUBE.toString(), cubeName);
-        this.metricsEvent.put(PropertyEnum.SEGMENT.toString(), segmentName);
-        this.metricsEvent.put(PropertyEnum.CUBOID_SOURCE.toString(), sourceCuboidId);
-        this.metricsEvent.put(PropertyEnum.CUBOID_TARGET.toString(), targetCuboidId);
-        this.metricsEvent.put(PropertyEnum.IF_MATCH.toString(), sourceCuboidId == targetCuboidId);
-        this.metricsEvent.put(PropertyEnum.FILTER_MASK.toString(), filterMask);
-    }
-
-    public void setStats(long callCount, long callTimeSum, long callTimeMax, long skipCount, long scanCount,
-            long returnCount, long aggrCount, boolean ifSuccess, double weightPerHit) {
-        this.metricsEvent.put(PropertyEnum.CALL_COUNT.toString(), callCount);
-        this.metricsEvent.put(PropertyEnum.TIME_SUM.toString(), callTimeSum);
-        this.metricsEvent.put(PropertyEnum.TIME_MAX.toString(), callTimeMax);
-        this.metricsEvent.put(PropertyEnum.SKIP_COUNT.toString(), skipCount);
-        this.metricsEvent.put(PropertyEnum.SCAN_COUNT.toString(), scanCount);
-        this.metricsEvent.put(PropertyEnum.RETURN_COUNT.toString(), returnCount);
-        this.metricsEvent.put(PropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount);
-        this.metricsEvent.put(PropertyEnum.AGGR_COUNT.toString(), aggrCount);
-        this.metricsEvent.put(PropertyEnum.IF_SUCCESS.toString(), ifSuccess);
-        this.metricsEvent.put(PropertyEnum.WEIGHT_PER_HIT.toString(), weightPerHit);
-    }
-
-    public Object getProperty(String key) {
-        return this.metricsEvent.get(key);
-    }
-
-    public enum PropertyEnum {
-        PROJECT("PROJECT"), CUBE("CUBE_NAME"), SEGMENT("SEGMENT_NAME"), CUBOID_SOURCE("CUBOID_SOURCE"), CUBOID_TARGET(
-                "CUBOID_TARGET"), IF_MATCH("IF_MATCH"), FILTER_MASK("FILTER_MASK"), IF_SUCCESS("IF_SUCCESS"), //
-        TIME_SUM("STORAGE_CALL_TIME_SUM"), TIME_MAX("STORAGE_CALL_TIME_MAX"), WEIGHT_PER_HIT(
-                "WEIGHT_PER_HIT"), CALL_COUNT("STORAGE_CALL_COUNT"), SKIP_COUNT("STORAGE_COUNT_SKIP"), SCAN_COUNT(
-                        "STORAGE_COUNT_SCAN"), RETURN_COUNT("STORAGE_COUNT_RETURN"), AGGR_FILTER_COUNT(
-                                "STORAGE_COUNT_AGGREGATE_FILTER"), AGGR_COUNT("STORAGE_COUNT_AGGREGATE");
-
-        private final String propertyName;
-
-        PropertyEnum(String name) {
-            this.propertyName = name;
-        }
-
-        public static PropertyEnum getByName(String name) {
-            if (Strings.isNullOrEmpty(name)) {
-                return null;
-            }
-            for (PropertyEnum property : PropertyEnum.values()) {
-                if (property.propertyName.equals(name.toUpperCase())) {
-                    return property;
-                }
-            }
-
-            return null;
-        }
-
-        @Override
-        public String toString() {
-            return propertyName;
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java
deleted file mode 100644
index 8ea0222..0000000
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.query;
-
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Strings;
-
-public class QueryRecordEventWrapper extends RecordEventWrapper {
-
-    private static final Logger logger = LoggerFactory.getLogger(QueryRecordEventWrapper.class);
-
-    public QueryRecordEventWrapper(RecordEvent metricsEvent) {
-        super(metricsEvent);
-    }
-
-    public void setWrapper(long queryHashCode, String queryType, String projectName, String realizationName,
-            int realizationType, Throwable throwable) {
-        this.metricsEvent.put(PropertyEnum.ID_CODE.toString(), queryHashCode);
-        this.metricsEvent.put(PropertyEnum.TYPE.toString(), queryType);
-        this.metricsEvent.put(PropertyEnum.PROJECT.toString(), projectName);
-        this.metricsEvent.put(PropertyEnum.REALIZATION.toString(), realizationName);
-        this.metricsEvent.put(PropertyEnum.REALIZATION_TYPE.toString(), realizationType);
-        this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(),
-                throwable == null ? "NULL" : throwable.getClass().getName());
-    }
-
-    public void setStats(long callTimeMs, long returnCountByCalcite, long returnCountByStorage) {
-        this.metricsEvent.put(PropertyEnum.TIME_COST.toString(), callTimeMs);
-        this.metricsEvent.put(PropertyEnum.CALCITE_RETURN_COUNT.toString(), returnCountByCalcite);
-        this.metricsEvent.put(PropertyEnum.STORAGE_RETURN_COUNT.toString(), returnCountByStorage);
-        long countAggrAndFilter = returnCountByStorage - returnCountByCalcite;
-        if (countAggrAndFilter < 0) {
-            countAggrAndFilter = 0;
-            logger.warn(returnCountByStorage + " rows returned by storage less than " + returnCountByCalcite
-                    + " rows returned by calcite");
-        }
-        this.metricsEvent.put(PropertyEnum.AGGR_FILTER_COUNT.toString(), countAggrAndFilter);
-    }
-
-    public enum PropertyEnum {
-        ID_CODE("QUERY_HASH_CODE"), TYPE("QUERY_TYPE"), PROJECT("PROJECT"), REALIZATION(
-                "REALIZATION"), REALIZATION_TYPE("REALIZATION_TYPE"), EXCEPTION("EXCEPTION"), //
-        TIME_COST("QUERY_TIME_COST"), CALCITE_RETURN_COUNT("CALCITE_COUNT_RETURN"), STORAGE_RETURN_COUNT(
-                "STORAGE_COUNT_RETURN"), AGGR_FILTER_COUNT("CALCITE_COUNT_AGGREGATE_FILTER");
-
-        private final String propertyName;
-
-        PropertyEnum(String name) {
-            this.propertyName = name;
-        }
-
-        public static PropertyEnum getByName(String name) {
-            if (Strings.isNullOrEmpty(name)) {
-                return null;
-            }
-            for (PropertyEnum property : PropertyEnum.values()) {
-                if (property.propertyName.equals(name.toUpperCase())) {
-                    return property;
-                }
-            }
-
-            return null;
-        }
-
-        @Override
-        public String toString() {
-            return propertyName;
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java
deleted file mode 100644
index abcbb61..0000000
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.query;
-
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
-
-import com.google.common.base.Strings;
-
-public class RPCRecordEventWrapper extends RecordEventWrapper {
-
-    public RPCRecordEventWrapper(RecordEvent metricsEvent) {
-        super(metricsEvent);
-    }
-
-    public void setWrapper(String projectName, String realizationName, String rpcServer, Throwable throwable) {
-        this.metricsEvent.put(PropertyEnum.PROJECT.toString(), projectName);
-        this.metricsEvent.put(PropertyEnum.REALIZATION.toString(), realizationName);
-        this.metricsEvent.put(PropertyEnum.RPC_SERVER.toString(), rpcServer);
-        this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(),
-                throwable == null ? "NULL" : throwable.getClass().getName());
-    }
-
-    public void setStats(long callTimeMs, long skipCount, long scanCount, long returnCount, long aggrCount) {
-        this.metricsEvent.put(PropertyEnum.CALL_TIME.toString(), callTimeMs);
-        this.metricsEvent.put(PropertyEnum.SKIP_COUNT.toString(), skipCount); //Number of skips on region servers based on region meta or fuzzy filter
-        this.metricsEvent.put(PropertyEnum.SCAN_COUNT.toString(), scanCount); //Count scanned by region server
-        this.metricsEvent.put(PropertyEnum.RETURN_COUNT.toString(), returnCount);//Count returned by region server
-        this.metricsEvent.put(PropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount); //Count filtered & aggregated by coprocessor
-        this.metricsEvent.put(PropertyEnum.AGGR_COUNT.toString(), aggrCount); //Count aggregated by coprocessor
-    }
-
-    public enum PropertyEnum {
-        PROJECT("PROJECT"), REALIZATION("REALIZATION"), RPC_SERVER("RPC_SERVER"), EXCEPTION("EXCEPTION"), //
-        CALL_TIME("CALL_TIME"), SKIP_COUNT("COUNT_SKIP"), SCAN_COUNT("COUNT_SCAN"), RETURN_COUNT(
-                "COUNT_RETURN"), AGGR_FILTER_COUNT("COUNT_AGGREGATE_FILTER"), AGGR_COUNT("COUNT_AGGREGATE");
-
-        private final String propertyName;
-
-        PropertyEnum(String name) {
-            this.propertyName = name;
-        }
-
-        public static PropertyEnum getByName(String name) {
-            if (Strings.isNullOrEmpty(name)) {
-                return null;
-            }
-            for (PropertyEnum property : PropertyEnum.values()) {
-                if (property.propertyName.equals(name.toUpperCase())) {
-                    return property;
-                }
-            }
-
-            return null;
-        }
-
-        @Override
-        public String toString() {
-            return propertyName;
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index c4b6e12..3fb1650 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -46,12 +46,9 @@ import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.job.metrics.JobMetricsFacade;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metrics.MetricsManager;
-import org.apache.kylin.metrics.job.ExceptionRecordEventWrapper;
-import org.apache.kylin.metrics.job.JobRecordEventWrapper;
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -245,41 +242,32 @@ public class CubingJob extends DefaultChainedExecutable {
     protected void onStatusChange(ExecutableContext context, ExecuteResult result, ExecutableState state) {
         super.onStatusChange(context, result, state);
 
-        /**
-         * report job related metrics
-         */
+        updateMetrics(context, result, state);
+    }
+
+    protected void updateMetrics(ExecutableContext context, ExecuteResult result, ExecutableState state) {
+        JobMetricsFacade.JobStatisticsResult jobStats = new JobMetricsFacade.JobStatisticsResult();
+        jobStats.setWrapper(getSubmitter(), ProjectInstance.getNormalizedProjectName(getProjectName()),
+                CubingExecutableUtil.getCubeName(getParams()), getId(), getJobType(),
+                getAlgorithm() == null ? "NULL" : getAlgorithm().toString());
+
         if (state == ExecutableState.SUCCEED) {
-            JobRecordEventWrapper jobRecordEventWrapper = new JobRecordEventWrapper(
-                    new RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob()));
-            jobRecordEventWrapper.setWrapper(ProjectInstance.getNormalizedProjectName(getProjectName()),
-                    CubingExecutableUtil.getCubeName(getParams()), getId(), getJobType(),
-                    getAlgorithm() == null ? "NULL" : getAlgorithm().toString());
-            long tableSize = findSourceSizeBytes();
-            long buildDuration = getDuration();
-            long waitResourceTime = getMapReduceWaitTime();
-            jobRecordEventWrapper.setStats(tableSize, findCubeSizeBytes(), buildDuration, waitResourceTime,
-                    getPerBytesTimeCost(tableSize, buildDuration - waitResourceTime));
+            jobStats.setJobStats(findSourceSizeBytes(), findCubeSizeBytes(), getDuration(), getMapReduceWaitTime(),
+                    getPerBytesTimeCost(findSourceSizeBytes(), getDuration()));
             if (CubingJobTypeEnum.getByName(getJobType()) == CubingJobTypeEnum.BUILD) {
-                jobRecordEventWrapper.setStepStats(
-                        getTaskByName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS).getDuration(), //
-                        getTaskByName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY).getDuration(), //
-                        getTaskByName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE).getDuration(), //
+                jobStats.setJobStepStats(
+                        getTaskByName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS).getDuration(),
+                        getTaskByName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY).getDuration(),
+                        getTaskByName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE).getDuration(),
                         getTaskByName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE).getDuration());
             }
-            MetricsManager.getInstance().update(jobRecordEventWrapper.getMetricsRecord());
         } else if (state == ExecutableState.ERROR) {
-            ExceptionRecordEventWrapper exceptionRecordEventWrapper = new ExceptionRecordEventWrapper(
-                    new RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException()));
-            exceptionRecordEventWrapper.setWrapper(ProjectInstance.getNormalizedProjectName(getProjectName()),
-                    CubingExecutableUtil.getCubeName(getParams()), getId(), getJobType(),
-                    getAlgorithm() == null ? "NULL" : getAlgorithm().toString(),
-                    result.getThrowable() != null ? result.getThrowable().getClass() : Exception.class);
-            MetricsManager.getInstance().update(exceptionRecordEventWrapper.getMetricsRecord());
+            jobStats.setJobException(result.getThrowable() != null ? result.getThrowable() : new Exception());
         }
-
+        JobMetricsFacade.updateMetrics(jobStats);
     }
 
-    private double getPerBytesTimeCost(long size, long timeCost) {
+    private static double getPerBytesTimeCost(long size, long timeCost) {
         if (size <= 0) {
             return 0;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index 8b58382..4e09f6f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -30,13 +30,15 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.metrics.MetricsManager;
 import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.apache.kylin.metrics.query.CubeSegmentRecordEventWrapper;
-import org.apache.kylin.metrics.query.QueryRecordEventWrapper;
-import org.apache.kylin.metrics.query.RPCRecordEventWrapper;
+import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
+import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
+import org.apache.kylin.metrics.property.QueryPropertyEnum;
+import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.security.core.context.SecurityContextHolder;
 
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
@@ -66,6 +68,11 @@ public class QueryMetricsFacade {
     }
 
     public static void updateMetrics(SQLRequest sqlRequest, SQLResponse sqlResponse) {
+        updateMetricsToLocal(sqlRequest, sqlResponse);
+        updateMetricsToReservoir(sqlRequest, sqlResponse);
+    }
+
+    private static void updateMetricsToLocal(SQLRequest sqlRequest, SQLResponse sqlResponse) {
         if (!enabled)
             return;
 
@@ -78,57 +85,137 @@ public class QueryMetricsFacade {
 
         String cubeMetricName = projectName + ",sub=" + cubeName;
         update(getQueryMetrics(cubeMetricName), sqlResponse);
+    }
 
-        /**
-         * report query related metrics
-         */
-        final QueryContext.QueryStatisticsResult queryStatisticsResult = sqlResponse.getQueryStatistics();
-        for (QueryContext.RPCStatistics entry : queryStatisticsResult.getRpcStatisticsList()) {
-            RPCRecordEventWrapper rpcMetricsEventWrapper = new RPCRecordEventWrapper(
-                    new RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall()));
-            rpcMetricsEventWrapper.setWrapper(sqlRequest.getProject(), entry.getRealizationName(), entry.getRpcServer(),
-                    entry.getException());
-            rpcMetricsEventWrapper.setStats(entry.getCallTimeMs(), entry.getSkippedRows(), entry.getScannedRows(),
-                    entry.getReturnedRows(), entry.getAggregatedRows());
+    /**
+     * report query related metrics
+     */
+    private static void updateMetricsToReservoir(SQLRequest sqlRequest, SQLResponse sqlResponse) {
+        if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) {
+            return;
+        }
+        String user = SecurityContextHolder.getContext().getAuthentication().getName();
+        if (user == null) {
+            user = "unknown";
+        }
+        for (QueryContext.RPCStatistics entry : QueryContext.current().getRpcStatisticsList()) {
+            RecordEvent rpcMetricsEvent = new TimedRecordEvent(
+                    KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
+            setRPCWrapper(rpcMetricsEvent, //
+                    sqlRequest.getProject(), entry.getRealizationName(), entry.getRpcServer(), entry.getException());
+            setRPCStats(rpcMetricsEvent, //
+                    entry.getCallTimeMs(), entry.getSkippedRows(), entry.getScannedRows(), entry.getReturnedRows(),
+                    entry.getAggregatedRows());
             //For update rpc level related metrics
-            MetricsManager.getInstance().update(rpcMetricsEventWrapper.getMetricsRecord());
+            MetricsManager.getInstance().update(rpcMetricsEvent);
         }
         long sqlHashCode = getSqlHashCode(sqlRequest.getSql());
-        for (QueryContext.CubeSegmentStatisticsResult contextEntry : queryStatisticsResult
-                .getCubeSegmentStatisticsResultList()) {
-            QueryRecordEventWrapper queryMetricsEventWrapper = new QueryRecordEventWrapper(
-                    new RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuery()));
-            queryMetricsEventWrapper.setWrapper(sqlHashCode,
-                    sqlResponse.isStorageCacheUsed() ? "CACHE" : contextEntry.getQueryType(), sqlRequest.getProject(),
-                    contextEntry.getRealization(), contextEntry.getRealizationType(), sqlResponse.getThrowable());
+        for (QueryContext.CubeSegmentStatisticsResult contextEntry : sqlResponse.getCubeSegmentStatisticsList()) {
+            RecordEvent queryMetricsEvent = new TimedRecordEvent(
+                    KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuery());
+            setQueryWrapper(queryMetricsEvent, //
+                    user, sqlHashCode, sqlResponse.isStorageCacheUsed() ? "CACHE" : contextEntry.getQueryType(),
+                    sqlRequest.getProject(), contextEntry.getRealization(), contextEntry.getRealizationType(),
+                    sqlResponse.getThrowable());
 
             long totalStorageReturnCount = 0L;
             for (Map<String, QueryContext.CubeSegmentStatistics> cubeEntry : contextEntry.getCubeSegmentStatisticsMap()
                     .values()) {
                 for (QueryContext.CubeSegmentStatistics segmentEntry : cubeEntry.values()) {
-                    CubeSegmentRecordEventWrapper cubeSegmentMetricsEventWrapper = new CubeSegmentRecordEventWrapper(
-                            new RecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryCube()));
+                    RecordEvent cubeSegmentMetricsEvent = new TimedRecordEvent(
+                            KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryCube());
 
-                    cubeSegmentMetricsEventWrapper.setWrapper(sqlRequest.getProject(), segmentEntry.getCubeName(),
-                            segmentEntry.getSegmentName(), segmentEntry.getSourceCuboidId(),
-                            segmentEntry.getTargetCuboidId(), segmentEntry.getFilterMask());
+                    setCubeWrapper(cubeSegmentMetricsEvent, //
+                            sqlRequest.getProject(), segmentEntry.getCubeName(), segmentEntry.getSegmentName(),
+                            segmentEntry.getSourceCuboidId(), segmentEntry.getTargetCuboidId(),
+                            segmentEntry.getFilterMask());
 
-                    cubeSegmentMetricsEventWrapper.setStats(segmentEntry.getCallCount(), segmentEntry.getCallTimeSum(),
-                            segmentEntry.getCallTimeMax(), segmentEntry.getStorageSkippedRows(),
-                            segmentEntry.getStorageScannedRows(), segmentEntry.getStorageReturnedRows(),
-                            segmentEntry.getStorageAggregatedRows(), segmentEntry.isIfSuccess(),
-                            1.0 / cubeEntry.size());
+                    setCubeStats(cubeSegmentMetricsEvent, //
+                            segmentEntry.getCallCount(), segmentEntry.getCallTimeSum(), segmentEntry.getCallTimeMax(),
+                            segmentEntry.getStorageSkippedRows(), segmentEntry.getStorageScannedRows(),
+                            segmentEntry.getStorageReturnedRows(), segmentEntry.getStorageAggregatedRows(),
+                            segmentEntry.isIfSuccess(), 1.0 / cubeEntry.size());
 
                     totalStorageReturnCount += segmentEntry.getStorageReturnedRows();
                     //For update cube segment level related query metrics
-                    MetricsManager.getInstance().update(cubeSegmentMetricsEventWrapper.getMetricsRecord());
+                    MetricsManager.getInstance().update(cubeSegmentMetricsEvent);
                 }
             }
-            queryMetricsEventWrapper.setStats(sqlResponse.getDuration(), sqlResponse.getResults().size(),
-                    totalStorageReturnCount);
+            setQueryStats(queryMetricsEvent, //
+                    sqlResponse.getDuration(), sqlResponse.getResults().size(), totalStorageReturnCount);
             //For update query level metrics
-            MetricsManager.getInstance().update(queryMetricsEventWrapper.getMetricsRecord());
+            MetricsManager.getInstance().update(queryMetricsEvent);
+        }
+    }
+
+    private static void setRPCWrapper(RecordEvent metricsEvent, String projectName, String realizationName,
+            String rpcServer, Throwable throwable) {
+        metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), projectName);
+        metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), realizationName);
+        metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), rpcServer);
+        metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(),
+                throwable == null ? "NULL" : throwable.getClass().getName());
+    }
+
+    private static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, long skipCount, long scanCount,
+            long returnCount, long aggrCount) {
+        metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), callTimeMs);
+        metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), skipCount); //Number of skips on region servers based on region meta or fuzzy filter
+        metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), scanCount); //Count scanned by region server
+        metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), returnCount);//Count returned by region server
+        metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount); //Count filtered & aggregated by coprocessor
+        metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), aggrCount); //Count aggregated by coprocessor
+    }
+
+    private static void setCubeWrapper(RecordEvent metricsEvent, String projectName, String cubeName,
+            String segmentName, long sourceCuboidId, long targetCuboidId, long filterMask) {
+        metricsEvent.put(QueryCubePropertyEnum.PROJECT.toString(), projectName);
+        metricsEvent.put(QueryCubePropertyEnum.CUBE.toString(), cubeName);
+        metricsEvent.put(QueryCubePropertyEnum.SEGMENT.toString(), segmentName);
+        metricsEvent.put(QueryCubePropertyEnum.CUBOID_SOURCE.toString(), sourceCuboidId);
+        metricsEvent.put(QueryCubePropertyEnum.CUBOID_TARGET.toString(), targetCuboidId);
+        metricsEvent.put(QueryCubePropertyEnum.IF_MATCH.toString(), sourceCuboidId == targetCuboidId);
+        metricsEvent.put(QueryCubePropertyEnum.FILTER_MASK.toString(), filterMask);
+    }
+
+    private static void setCubeStats(RecordEvent metricsEvent, long callCount, long callTimeSum, long callTimeMax,
+            long skipCount, long scanCount, long returnCount, long aggrCount, boolean ifSuccess, double weightPerHit) {
+        metricsEvent.put(QueryCubePropertyEnum.CALL_COUNT.toString(), callCount);
+        metricsEvent.put(QueryCubePropertyEnum.TIME_SUM.toString(), callTimeSum);
+        metricsEvent.put(QueryCubePropertyEnum.TIME_MAX.toString(), callTimeMax);
+        metricsEvent.put(QueryCubePropertyEnum.SKIP_COUNT.toString(), skipCount);
+        metricsEvent.put(QueryCubePropertyEnum.SCAN_COUNT.toString(), scanCount);
+        metricsEvent.put(QueryCubePropertyEnum.RETURN_COUNT.toString(), returnCount);
+        metricsEvent.put(QueryCubePropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount);
+        metricsEvent.put(QueryCubePropertyEnum.AGGR_COUNT.toString(), aggrCount);
+        metricsEvent.put(QueryCubePropertyEnum.IF_SUCCESS.toString(), ifSuccess);
+        metricsEvent.put(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), weightPerHit);
+    }
+
+    private static void setQueryWrapper(RecordEvent metricsEvent, String user, long queryHashCode, String queryType,
+            String projectName, String realizationName, int realizationType, Throwable throwable) {
+        metricsEvent.put(QueryPropertyEnum.USER.toString(), user);
+        metricsEvent.put(QueryPropertyEnum.ID_CODE.toString(), queryHashCode);
+        metricsEvent.put(QueryPropertyEnum.TYPE.toString(), queryType);
+        metricsEvent.put(QueryPropertyEnum.PROJECT.toString(), projectName);
+        metricsEvent.put(QueryPropertyEnum.REALIZATION.toString(), realizationName);
+        metricsEvent.put(QueryPropertyEnum.REALIZATION_TYPE.toString(), realizationType);
+        metricsEvent.put(QueryPropertyEnum.EXCEPTION.toString(),
+                throwable == null ? "NULL" : throwable.getClass().getName());
+    }
+
+    private static void setQueryStats(RecordEvent metricsEvent, long callTimeMs, long returnCountByCalcite,
+            long returnCountByStorage) {
+        metricsEvent.put(QueryPropertyEnum.TIME_COST.toString(), callTimeMs);
+        metricsEvent.put(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString(), returnCountByCalcite);
+        metricsEvent.put(QueryPropertyEnum.STORAGE_RETURN_COUNT.toString(), returnCountByStorage);
+        long countAggrAndFilter = returnCountByStorage - returnCountByCalcite;
+        if (countAggrAndFilter < 0) {
+            countAggrAndFilter = 0;
+            logger.warn(returnCountByStorage + " rows returned by storage less than " + returnCountByCalcite
+                    + " rows returned by calcite");
         }
+        metricsEvent.put(QueryPropertyEnum.AGGR_FILTER_COUNT.toString(), countAggrAndFilter);
     }
 
     private static void update(QueryMetrics queryMetrics, SQLResponse sqlResponse) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/server-base/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/SQLRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
index a98aba7..85a7564 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
@@ -53,7 +53,7 @@ public class SQLRequest implements Serializable {
     }
 
     public String getProject() {
-        return project;
+        return norm(project);
     }
 
     public void setProject(String project) {
@@ -114,6 +114,13 @@ public class SQLRequest implements Serializable {
         return cacheKey;
     }
 
+    public static String norm(String str) {
+        if (str == null) {
+            return null;
+        }
+        return str.toUpperCase();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o)

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index bed4764..ee85162 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -21,11 +21,12 @@ package org.apache.kylin.rest.response;
 import java.io.Serializable;
 import java.util.List;
 
-import org.apache.commons.lang.SerializationUtils;
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.Lists;
 
 public class SQLResponse implements Serializable {
     protected static final long serialVersionUID = 1L;
@@ -205,11 +206,24 @@ public class SQLResponse implements Serializable {
     }
 
     @JsonIgnore
-    public QueryContext.QueryStatisticsResult getQueryStatistics() {
-        return (QueryContext.QueryStatisticsResult) SerializationUtils.deserialize(queryStatistics);
-    }
-
-    public void setQueryStatistics(QueryContext.QueryStatisticsResult queryStatisticsResult) {
-        this.queryStatistics = SerializationUtils.serialize(queryStatisticsResult);
+    public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() {
+        try {
+            return queryStatistics == null ? Lists.<QueryContext.CubeSegmentStatisticsResult> newArrayList()
+                    : (List<QueryContext.CubeSegmentStatisticsResult>) SerializationUtils.deserialize(queryStatistics);
+        } catch (Exception e) { // deserialize exception should not block query
+            System.out.println("Error while deserialize queryStatistics due to " + e);
+            return Lists.newArrayList();
+        }
+    }
+
+    public void setCubeSegmentStatisticsList(
+            List<QueryContext.CubeSegmentStatisticsResult> cubeSegmentStatisticsList) {
+        try {
+            this.queryStatistics = cubeSegmentStatisticsList == null ? null
+                    : SerializationUtils.serialize((Serializable) cubeSegmentStatisticsList);
+        } catch (Exception e) { // serialize exception should not block query
+            System.out.println("Error while serialize queryStatistics due to " + e);
+            this.queryStatistics = null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ec897aae/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index cf37e66..f8566de 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -463,6 +463,7 @@ public class QueryService extends BasicService {
                 sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e));
                 sqlResponse.setTotalScanCount(queryContext.getScannedRows());
                 sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
+                sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
 
                 if (queryCacheEnabled && e.getCause() != null
                         && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
@@ -969,7 +970,7 @@ public class QueryService extends BasicService {
                 isPushDown);
         response.setTotalScanCount(QueryContext.current().getScannedRows());
         response.setTotalScanBytes(QueryContext.current().getScannedBytes());
-        response.setQueryStatistics(QueryContext.current().getQueryStatisticsResult());
+        response.setCubeSegmentStatisticsList(QueryContext.current().getCubeSegmentStatisticsResultList());
         return response;
     }