You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/10/27 14:05:54 UTC

[4/6] kylin git commit: APACHE-KYLIN-2723: Introduce metrics collector for query & job metrics

APACHE-KYLIN-2723: Introduce metrics collector for query & job metrics

Signed-off-by: Zhong <nj...@apache.org>
Signed-off-by: lidongsjtu <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d8af4518
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d8af4518
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d8af4518

Branch: refs/heads/master
Commit: d8af4518d80aca1cb3e58c4eff2ca0563bb02edc
Parents: b040c37
Author: Wang Ken <mi...@ebay.com>
Authored: Tue Aug 8 23:57:22 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Fri Oct 27 21:58:08 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   9 ++
 .../apache/kylin/metrics/MetricsManager.java    | 142 +++++++++++++++++++
 .../job/ExceptionRecordEventWrapper.java        |  40 ++++++
 .../kylin/metrics/job/JobPropertyEnum.java      |  56 ++++++++
 .../metrics/job/JobRecordEventWrapper.java      |  83 +++++++++++
 .../query/CubeSegmentRecordEventWrapper.java    | 124 ++++++++++++++++
 .../metrics/query/QueryRecordEventWrapper.java  | 103 ++++++++++++++
 .../metrics/query/RPCRecordEventWrapper.java    |  82 +++++++++++
 pom.xml                                         |  15 ++
 server-base/pom.xml                             |  12 ++
 server/src/main/resources/kylinMetrics.xml      |  84 +++++++++++
 server/src/main/webapp/WEB-INF/web.xml          |   1 +
 .../kylin/rest/service/ServiceTestBase.java     |   3 +-
 13 files changed, 753 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 0faea9c..922c10b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1328,8 +1328,17 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.metrics.perflogger-class", "org.apache.kylin.common.metrics.perflog.PerfLogger");
     }
 
+    public boolean isMetricsMonitorEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.metrics.monitor-enabled", "false"));
+    }
+
     public String getMetricsActiveReservoirDefaultClass() {
         return getOptional("kylin.metrics.active-reservoir-default-class",
                 "org.apache.kylin.metrics.lib.impl.StubReservoir");
     }
+
+    public String getSystemCubeSinkDefaultClass() {
+        return getOptional("kylin.metrics.system-cube-sink-default-class",
+                "org.apache.kylin.metrics.lib.impl.hive.HiveSink");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
new file mode 100644
index 0000000..8899f07
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics;
+
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.Metrics;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.Sink;
+import org.apache.kylin.metrics.lib.impl.MetricsSystem;
+import org.apache.kylin.metrics.lib.impl.ReporterBuilder;
+import org.apache.kylin.metrics.lib.impl.StubSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class MetricsManager {
+
+    public static final String SYSTEM_PROJECT = "KYLIN_SYSTEM";
+    private static final Logger logger = LoggerFactory.getLogger(MetricsManager.class);
+    private static final MetricsManager instance = new MetricsManager();
+    private static final String METHOD_FOR_REGISTRY = "forRegistry";
+    private static Map<ActiveReservoir, List<Pair<Class<? extends ActiveReservoirReporter>, Properties>>> sourceReporterBindProps = Maps
+            .newHashMap();
+    private static Sink scSink;
+    private final Set<String> activeReservoirPointers;
+
+    private MetricsManager() {
+        activeReservoirPointers = Sets.newHashSet();
+    }
+
+    public static MetricsManager getInstance() {
+        return instance;
+    }
+
+    public static void setSystemCubeSink(Sink systemCubeSink) {
+        scSink = systemCubeSink;
+    }
+
+    public static void setSourceReporterBindProps(
+            Map<ActiveReservoir, List<Pair<String, Properties>>> sourceReporterBindProperties) {
+        sourceReporterBindProps = Maps.newHashMapWithExpectedSize(sourceReporterBindProperties.size());
+        for (ActiveReservoir activeReservoir : sourceReporterBindProperties.keySet()) {
+            List<Pair<Class<? extends ActiveReservoirReporter>, Properties>> values = Lists
+                    .newArrayListWithExpectedSize(sourceReporterBindProperties.get(activeReservoir).size());
+            sourceReporterBindProps.put(activeReservoir, values);
+            for (Pair<String, Properties> entry : sourceReporterBindProperties.get(activeReservoir)) {
+                try {
+                    Class clz = Class.forName(entry.getKey());
+                    if (ActiveReservoirReporter.class.isAssignableFrom(clz)) {
+                        values.add(new Pair(clz, entry.getValue()));
+                    } else {
+                        logger.warn("The class " + clz + " is not a sub class of " + ActiveReservoir.class);
+                    }
+                } catch (ClassNotFoundException e) {
+                    logger.warn("Cannot find class " + entry.getKey());
+                }
+            }
+        }
+    }
+
+    public void init() {
+        if (scSink == null) {
+            logger.warn("SystemCubeSink is not set and the default one will be chosen");
+            try {
+                Class clz = Class.forName(KylinConfig.getInstanceFromEnv().getSystemCubeSinkDefaultClass());
+                scSink = (Sink) clz.getConstructor().newInstance();
+            } catch (Exception e) {
+                logger.warn(
+                        "Failed to initialize the " + KylinConfig.getInstanceFromEnv().getSystemCubeSinkDefaultClass()
+                                + ". The StubSink will be used");
+                scSink = new StubSink();
+            }
+        }
+
+        if (KylinConfig.getInstanceFromEnv().isMetricsMonitorEnabled()) {
+            logger.info("Kylin metrics monitor is enabled.");
+            int nameIdx = 0;
+            for (ActiveReservoir activeReservoir : sourceReporterBindProps.keySet()) {
+                String registerName = MetricsSystem.name(MetricsManager.class,
+                        "-" + nameIdx + "-" + activeReservoir.toString());
+                activeReservoirPointers.add(registerName);
+                List<Pair<Class<? extends ActiveReservoirReporter>, Properties>> reportProps = sourceReporterBindProps
+                        .get(activeReservoir);
+                for (Pair<Class<? extends ActiveReservoirReporter>, Properties> subEntry : reportProps) {
+                    try {
+                        Method method = subEntry.getKey().getMethod(METHOD_FOR_REGISTRY, ActiveReservoir.class);
+                        ((ReporterBuilder) method.invoke(null, activeReservoir)).setConfig(subEntry.getValue()).build()
+                                .start();
+                    } catch (Exception e) {
+                        logger.warn("Cannot initialize ActiveReservoirReporter: Builder class - " + subEntry.getKey()
+                                + ", Properties - " + subEntry.getValue());
+                    }
+                }
+                Metrics.register(registerName, activeReservoir);
+            }
+            Preconditions.checkArgument(activeReservoirPointers.size() == sourceReporterBindProps.keySet().size(),
+                    "Duplicate register names exist!!!");
+        } else {
+            logger.info("Kylin metrics monitor is not enabled!!!");
+        }
+    }
+
+    public void update(Record record) {
+        for (String registerName : activeReservoirPointers) {
+            Metrics.activeReservoir(registerName).update(record);
+        }
+    }
+
+    public String getSystemTableFromSubject(String subject) {
+        return scSink.getTableFromSubject(subject);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/core-metrics/src/main/java/org/apache/kylin/metrics/job/ExceptionRecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/job/ExceptionRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/job/ExceptionRecordEventWrapper.java
new file mode 100644
index 0000000..8d56025
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/job/ExceptionRecordEventWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.job;
+
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
+
+public class ExceptionRecordEventWrapper extends RecordEventWrapper {
+
+    public ExceptionRecordEventWrapper(RecordEvent metricsEvent) {
+        super(metricsEvent);
+    }
+
+    public <T extends Throwable> void setWrapper(String projectName, String cubeName, String jobId, String jobType,
+            String cubingType, Class<T> exceptionClassName) {
+        this.metricsEvent.put(JobPropertyEnum.PROJECT.toString(), projectName);
+        this.metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName);
+        this.metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), jobId);
+        this.metricsEvent.put(JobPropertyEnum.TYPE.toString(), jobType);
+        this.metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), cubingType);
+        this.metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), exceptionClassName.getName());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobPropertyEnum.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobPropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobPropertyEnum.java
new file mode 100644
index 0000000..be32424
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobPropertyEnum.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.job;
+
+import com.google.common.base.Strings;
+
+public enum JobPropertyEnum {
+    ID_CODE("JOB_ID"), PROJECT("PROJECT"), CUBE("CUBE_NAME"), TYPE("JOB_TYPE"), ALGORITHM("CUBING_TYPE"), STATUS(
+            "JOB_STATUS"), EXCEPTION("EXCEPTION"), //
+    SOURCE_SIZE("TABLE_SIZE"), CUBE_SIZE("CUBE_SIZE"), BUILD_DURATION("DURATION"), WAIT_RESOURCE_TIME(
+            "WAIT_RESOURCE_TIME"), PER_BYTES_TIME_COST("PER_BYTES_TIME_COST"), STEP_DURATION_DISTINCT_COLUMNS(
+                    "STEP_DURATION_DISTINCT_COLUMNS"), STEP_DURATION_DICTIONARY(
+                            "STEP_DURATION_DICTIONARY"), STEP_DURATION_INMEM_CUBING(
+                                    "STEP_DURATION_INMEM_CUBING"), STEP_DURATION_HFILE_CONVERT(
+                                            "STEP_DURATION_HFILE_CONVERT");
+
+    private final String propertyName;
+
+    JobPropertyEnum(String name) {
+        this.propertyName = name;
+    }
+
+    public static JobPropertyEnum getByName(String name) {
+        if (Strings.isNullOrEmpty(name)) {
+            return null;
+        }
+        for (JobPropertyEnum property : JobPropertyEnum.values()) {
+            if (property.propertyName.equals(name.toUpperCase())) {
+                return property;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return propertyName;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
new file mode 100644
index 0000000..537388c
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.job;
+
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
+
+public class JobRecordEventWrapper extends RecordEventWrapper {
+
+    public static final long MIN_SOURCE_SIZE = 33554432L; //32MB per block created by the first step
+
+    public JobRecordEventWrapper(RecordEvent metricsEvent) {
+        super(metricsEvent);
+        initStats();
+    }
+
+    public void initStats() {
+        this.metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), 0L);
+        this.metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), 0L);
+        this.metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), 0L);
+        this.metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), 0L);
+        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(), 0L);
+        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), 0L);
+        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), 0L);
+        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), 0L);
+        setDependentStats();
+    }
+
+    public void setWrapper(String projectName, String cubeName, String jobId, String jobType, String cubingType) {
+        this.metricsEvent.put(JobPropertyEnum.PROJECT.toString(), projectName);
+        this.metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName);
+        this.metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), jobId);
+        this.metricsEvent.put(JobPropertyEnum.TYPE.toString(), jobType);
+        this.metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), cubingType);
+    }
+
+    public void setStats(long tableSize, long cubeSize, long buildDuration, long waitResourceTime) {
+        this.metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), tableSize);
+        this.metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), cubeSize);
+        this.metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), buildDuration);
+        this.metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), waitResourceTime);
+        setDependentStats();
+    }
+
+    public void setStepStats(long dColumnDistinct, long dDictBuilding, long dCubingInmem, long dHfileConvert) {
+        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(), dColumnDistinct);
+        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), dDictBuilding);
+        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), dCubingInmem);
+        this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), dHfileConvert);
+    }
+
+    private void setDependentStats() {
+        Long sourceSize = (Long) this.metricsEvent.get(JobPropertyEnum.SOURCE_SIZE.toString());
+        if (sourceSize != null && sourceSize != 0) {
+            if (sourceSize < MIN_SOURCE_SIZE) {
+                sourceSize = MIN_SOURCE_SIZE;
+            }
+            this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(),
+                    ((Long) this.metricsEvent.get(JobPropertyEnum.BUILD_DURATION.toString())
+                            - (Long) this.metricsEvent.get(JobPropertyEnum.WAIT_RESOURCE_TIME.toString())) * 1.0
+                            / sourceSize);
+        } else {
+            this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 0.0);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java
new file mode 100644
index 0000000..5460848
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.query;
+
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
+
+import com.google.common.base.Strings;
+
+public class CubeSegmentRecordEventWrapper extends RecordEventWrapper {
+
+    public CubeSegmentRecordEventWrapper(RecordEvent metricsEvent) {
+        super(metricsEvent);
+
+        initStats();
+    }
+
+    private void initStats() {
+        this.metricsEvent.put(PropertyEnum.CALL_COUNT.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.TIME_SUM.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.TIME_MAX.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.SKIP_COUNT.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.SCAN_COUNT.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.RETURN_COUNT.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.AGGR_FILTER_COUNT.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.AGGR_COUNT.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.IF_SUCCESS.toString(), true);
+    }
+
+    public void setWrapper(String projectName, String cubeName, String segmentName, long sourceCuboidId,
+            long targetCuboidId, long filterMask) {
+        this.metricsEvent.put(PropertyEnum.PROJECT.toString(), projectName);
+        this.metricsEvent.put(PropertyEnum.CUBE.toString(), cubeName);
+        this.metricsEvent.put(PropertyEnum.SEGMENT.toString(), segmentName);
+        this.metricsEvent.put(PropertyEnum.CUBOID_SOURCE.toString(), sourceCuboidId);
+        this.metricsEvent.put(PropertyEnum.CUBOID_TARGET.toString(), targetCuboidId);
+        this.metricsEvent.put(PropertyEnum.IF_MATCH.toString(), sourceCuboidId == targetCuboidId);
+        this.metricsEvent.put(PropertyEnum.FILTER_MASK.toString(), filterMask);
+    }
+
+    public void setWeightPerHit(double weightPerHit) {
+        this.metricsEvent.put(PropertyEnum.WEIGHT_PER_HIT.toString(), weightPerHit);
+    }
+
+    public void addRPCStats(long callTimeMs, long skipCount, long scanCount, long returnCount, long aggrCount,
+            boolean ifSuccess) {
+        Long curCallCount = (Long) this.metricsEvent.get(PropertyEnum.CALL_COUNT.toString());
+        Long curTimeSum = (Long) this.metricsEvent.get(PropertyEnum.TIME_SUM.toString());
+        Long curTimeMax = (Long) this.metricsEvent.get(PropertyEnum.TIME_MAX.toString());
+        Long curSkipCount = (Long) this.metricsEvent.get(PropertyEnum.SKIP_COUNT.toString());
+        Long curScanCount = (Long) this.metricsEvent.get(PropertyEnum.SCAN_COUNT.toString());
+        Long curReturnCount = (Long) this.metricsEvent.get(PropertyEnum.RETURN_COUNT.toString());
+        Long curAggrAndFilterCount = (Long) this.metricsEvent.get(PropertyEnum.AGGR_FILTER_COUNT.toString());
+        Long curAggrCount = (Long) this.metricsEvent.get(PropertyEnum.AGGR_COUNT.toString());
+        Boolean curIfSuccess = (Boolean) this.metricsEvent.get(PropertyEnum.IF_SUCCESS.toString());
+
+        this.metricsEvent.put(PropertyEnum.CALL_COUNT.toString(), curCallCount + 1);
+        this.metricsEvent.put(PropertyEnum.TIME_SUM.toString(), curTimeSum + callTimeMs);
+        if (curTimeMax < callTimeMs) {
+            this.metricsEvent.put(PropertyEnum.TIME_MAX.toString(), callTimeMs);
+        }
+        this.metricsEvent.put(PropertyEnum.SKIP_COUNT.toString(), curSkipCount + skipCount);
+        this.metricsEvent.put(PropertyEnum.SCAN_COUNT.toString(), curScanCount + scanCount);
+        this.metricsEvent.put(PropertyEnum.RETURN_COUNT.toString(), curReturnCount + returnCount);
+        this.metricsEvent.put(PropertyEnum.AGGR_FILTER_COUNT.toString(),
+                curAggrAndFilterCount + scanCount - returnCount);
+        this.metricsEvent.put(PropertyEnum.AGGR_COUNT.toString(), curAggrCount + aggrCount);
+        this.metricsEvent.put(PropertyEnum.IF_SUCCESS.toString(), curIfSuccess && ifSuccess);
+    }
+
+    public Object getProperty(String key) {
+        return this.metricsEvent.get(key);
+    }
+
+    public enum PropertyEnum {
+        PROJECT("PROJECT"), CUBE("CUBE_NAME"), SEGMENT("SEGMENT_NAME"), CUBOID_SOURCE("CUBOID_SOURCE"), CUBOID_TARGET(
+                "CUBOID_TARGET"), IF_MATCH("IF_MATCH"), FILTER_MASK("FILTER_MASK"), IF_SUCCESS("IF_SUCCESS"), //
+        TIME_SUM("STORAGE_CALL_TIME_SUM"), TIME_MAX("STORAGE_CALL_TIME_MAX"), WEIGHT_PER_HIT(
+                "WEIGHT_PER_HIT"), CALL_COUNT("STORAGE_CALL_COUNT"), SKIP_COUNT("STORAGE_COUNT_SKIP"), SCAN_COUNT(
+                        "STORAGE_COUNT_SCAN"), RETURN_COUNT("STORAGE_COUNT_RETURN"), AGGR_FILTER_COUNT(
+                                "STORAGE_COUNT_AGGREGATE_FILTER"), AGGR_COUNT("STORAGE_COUNT_AGGREGATE");
+
+        private final String propertyName;
+
+        PropertyEnum(String name) {
+            this.propertyName = name;
+        }
+
+        public static PropertyEnum getByName(String name) {
+            if (Strings.isNullOrEmpty(name)) {
+                return null;
+            }
+            for (PropertyEnum property : PropertyEnum.values()) {
+                if (property.propertyName.equals(name.toUpperCase())) {
+                    return property;
+                }
+            }
+
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return propertyName;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java
new file mode 100644
index 0000000..937a83d
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.query;
+
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
+
+import com.google.common.base.Strings;
+
+public class QueryRecordEventWrapper extends RecordEventWrapper {
+
+    public QueryRecordEventWrapper(RecordEvent metricsEvent) {
+        super(metricsEvent);
+        initStats();
+    }
+
+    private void initStats() {
+        this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), "NULL");
+        this.metricsEvent.put(PropertyEnum.TIME_COST.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.CALCITE_RETURN_COUNT.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.STORAGE_RETURN_COUNT.toString(), 0L);
+        setDependentStats();
+    }
+
+    public void setWrapper(long queryHashCode, String queryType, String projectName, String realizationName,
+            int realizationType) {
+        this.metricsEvent.put(PropertyEnum.ID_CODE.toString(), queryHashCode);
+        this.metricsEvent.put(PropertyEnum.TYPE.toString(), queryType);
+        this.metricsEvent.put(PropertyEnum.PROJECT.toString(), projectName);
+        this.metricsEvent.put(PropertyEnum.REALIZATION.toString(), realizationName);
+        this.metricsEvent.put(PropertyEnum.REALIZATION_TYPE.toString(), realizationType);
+    }
+
+    public void addStorageStats(long addReturnCountByStorage) {
+        Long curReturnCountByStorage = (Long) this.metricsEvent.get(PropertyEnum.STORAGE_RETURN_COUNT.toString());
+        this.metricsEvent.put(PropertyEnum.STORAGE_RETURN_COUNT.toString(),
+                curReturnCountByStorage + addReturnCountByStorage);
+    }
+
+    public void setStats(long callTimeMs, long returnCountByCalcite) {
+        this.metricsEvent.put(PropertyEnum.TIME_COST.toString(), callTimeMs);
+        this.metricsEvent.put(PropertyEnum.CALCITE_RETURN_COUNT.toString(), returnCountByCalcite);
+        setDependentStats();
+    }
+
+    private void setDependentStats() {
+        this.metricsEvent.put(PropertyEnum.AGGR_FILTER_COUNT.toString(),
+                Math.max(0L, (Long) this.metricsEvent.get(PropertyEnum.STORAGE_RETURN_COUNT.toString())
+                        - (Long) this.metricsEvent.get(PropertyEnum.CALCITE_RETURN_COUNT.toString())));
+    }
+
+    public <T extends Throwable> void setStats(Class<T> exceptionClassName) {
+        this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), exceptionClassName.getName());
+    }
+
+    public enum PropertyEnum {
+        ID_CODE("QUERY_HASH_CODE"), TYPE("QUERY_TYPE"), PROJECT("PROJECT"), REALIZATION(
+                "REALIZATION"), REALIZATION_TYPE("REALIZATION_TYPE"), EXCEPTION("EXCEPTION"), //
+        TIME_COST("QUERY_TIME_COST"), CALCITE_RETURN_COUNT("CALCITE_COUNT_RETURN"), STORAGE_RETURN_COUNT(
+                "STORAGE_COUNT_RETURN"), AGGR_FILTER_COUNT("CALCITE_COUNT_AGGREGATE_FILTER");
+
+        private final String propertyName;
+
+        PropertyEnum(String name) {
+            this.propertyName = name;
+        }
+
+        public static PropertyEnum getByName(String name) {
+            if (Strings.isNullOrEmpty(name)) {
+                return null;
+            }
+            for (PropertyEnum property : PropertyEnum.values()) {
+                if (property.propertyName.equals(name.toUpperCase())) {
+                    return property;
+                }
+            }
+
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return propertyName;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java
new file mode 100644
index 0000000..e8774a4
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.query;
+
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
+
+import com.google.common.base.Strings;
+
+public class RPCRecordEventWrapper extends RecordEventWrapper {
+
+    public RPCRecordEventWrapper(RecordEvent metricsEvent) {
+        super(metricsEvent);
+    }
+
+    public void setRPCCallWrapper(String projectName, String realizationName, String rpcServer) {
+        this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), "NULL");
+        this.metricsEvent.put(PropertyEnum.PROJECT.toString(), projectName);
+        this.metricsEvent.put(PropertyEnum.REALIZATION.toString(), realizationName);
+        this.metricsEvent.put(PropertyEnum.RPC_SERVER.toString(), rpcServer);
+    }
+
+    public void setRPCCallStats(long callTimeMs, long skipCount, long scanCount, long returnCount, long aggrCount) {
+        this.metricsEvent.put(PropertyEnum.CALL_TIME.toString(), callTimeMs);
+        this.metricsEvent.put(PropertyEnum.SKIP_COUNT.toString(), skipCount); //Number of skips on region servers based on region meta or fuzzy filter
+        this.metricsEvent.put(PropertyEnum.SCAN_COUNT.toString(), scanCount); //Count scanned by region server
+        this.metricsEvent.put(PropertyEnum.RETURN_COUNT.toString(), returnCount);//Count returned by region server
+        this.metricsEvent.put(PropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount); //Count filtered & aggregated by coprocessor
+        this.metricsEvent.put(PropertyEnum.AGGR_COUNT.toString(), aggrCount); //Count aggregated by coprocessor
+    }
+
+    public <T extends Throwable> void setStats(Class<T> exceptionClassName) {
+        this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), exceptionClassName.getName());
+    }
+
+    public enum PropertyEnum {
+        PROJECT("PROJECT"), REALIZATION("REALIZATION"), RPC_SERVER("RPC_SERVER"), EXCEPTION("EXCEPTION"), //
+        CALL_TIME("CALL_TIME"), SKIP_COUNT("COUNT_SKIP"), SCAN_COUNT("COUNT_SCAN"), RETURN_COUNT(
+                "COUNT_RETURN"), AGGR_FILTER_COUNT("COUNT_AGGREGATE_FILTER"), AGGR_COUNT("COUNT_AGGREGATE");
+
+        private final String propertyName;
+
+        PropertyEnum(String name) {
+            this.propertyName = name;
+        }
+
+        public static PropertyEnum getByName(String name) {
+            if (Strings.isNullOrEmpty(name)) {
+                return null;
+            }
+            for (PropertyEnum property : PropertyEnum.values()) {
+                if (property.propertyName.equals(name.toUpperCase())) {
+                    return property;
+                }
+            }
+
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return propertyName;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ffbdf03..ea05c0d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -249,6 +249,21 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-core-metrics</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-metrics-reporter-hive</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-metrics-reporter-kafka</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
                 <artifactId>kylin-engine-mr</artifactId>
                 <version>${project.version}</version>
             </dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/server-base/pom.xml
----------------------------------------------------------------------
diff --git a/server-base/pom.xml b/server-base/pom.xml
index 07934ed..be21ff1 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -48,10 +48,22 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+        </dependency>
 
         <!-- these plug-in modules, should not have API dependencies -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-metrics-reporter-hive</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-metrics-reporter-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-storage-hbase</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/server/src/main/resources/kylinMetrics.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml
new file mode 100644
index 0000000..92e391f
--- /dev/null
+++ b/server/src/main/resources/kylinMetrics.xml
@@ -0,0 +1,84 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns="http://www.springframework.org/schema/beans"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans
+            http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
+
+    <description>Kylin Metrics Related Configuration</description>
+
+    <bean id="instantReservoir" class="org.apache.kylin.metrics.lib.impl.InstantReservoir"/>
+
+    <bean id="blockingReservoir" class="org.apache.kylin.metrics.lib.impl.BlockingReservoir">
+        <constructor-arg index="0">
+            <value>10</value>
+        </constructor-arg>
+        <constructor-arg index="1">
+            <value>100</value>
+        </constructor-arg>
+        <constructor-arg index="2">
+            <value>10</value>
+        </constructor-arg>
+    </bean>
+
+    <bean id="hiveSink" class="org.apache.kylin.metrics.lib.impl.hive.HiveSink"/>
+
+    <bean id="kafkaSink" class="org.apache.kylin.metrics.lib.impl.kafka.KafkaSink"/>
+
+    <bean id="systemCubeSink" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
+        <property name="targetClass" value="org.apache.kylin.metrics.MetricsManager"/>
+        <property name="targetMethod" value="setSystemCubeSink"/>
+        <property name="arguments">
+            <list>
+                <ref bean="hiveSink"/>
+            </list>
+        </property>
+    </bean>
+
+    <bean id="sourceReporterBindProperties" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
+        <property name="targetClass" value="org.apache.kylin.metrics.MetricsManager"/>
+        <property name="targetMethod" value="setSourceReporterBindProps"/>
+        <property name="arguments">
+            <map key-type="org.apache.kylin.metrics.lib.ActiveReservoir" value-type="java.util.List">
+                <entry key-ref="instantReservoir">
+                    <list>
+                        <bean class="org.apache.kylin.common.util.Pair">
+                            <property name="first"
+                                      value="org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter"/>
+                            <property name="second">
+                                <props>
+                                    <prop key="bootstrap.servers">sandbox:9092</prop>
+                                </props>
+                            </property>
+                        </bean>
+                    </list>
+                </entry>
+                <entry key-ref="blockingReservoir">
+                    <list>
+                        <bean class="org.apache.kylin.common.util.Pair">
+                            <property name="first"
+                                      value="org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter"/>
+                            <property name="second">
+                                <props>
+                                </props>
+                            </property>
+                        </bean>
+                    </list>
+                </entry>
+            </map>
+        </property>
+    </bean>
+
+</beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/server/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/server/src/main/webapp/WEB-INF/web.xml b/server/src/main/webapp/WEB-INF/web.xml
index b9cf620..cbce758 100644
--- a/server/src/main/webapp/WEB-INF/web.xml
+++ b/server/src/main/webapp/WEB-INF/web.xml
@@ -43,6 +43,7 @@
         <param-value>
         	classpath:applicationContext.xml
 			classpath:kylinSecurity.xml
+            classpath:kylinMetrics.xml
 			classpath*:kylin-*-plugin.xml
 		</param-value>
     </context-param>

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8af4518/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
index e2f5258..cf8c2c1 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
@@ -45,7 +45,8 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  * @author xduo
  */
 @RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:kylinSecurity.xml" })
+@ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:kylinSecurity.xml",
+        "classpath:kylinMetrics.xml" })
 @ActiveProfiles("testing")
 public class ServiceTestBase extends LocalFileMetadataTestCase {