You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/10/19 13:43:17 UTC

[1/4] kylin git commit: KYLIN-2944 HLLCSerializer, RawSerializer, PercentileSerializer returns shared object in serialize [Forced Update!]

Repository: kylin
Updated Branches:
  refs/heads/pr77 eaff61982 -> 556298656 (forced update)


KYLIN-2944 HLLCSerializer, RawSerializer, PercentileSerializer returns shared object in serialize


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3efa9b4f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3efa9b4f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3efa9b4f

Branch: refs/heads/pr77
Commit: 3efa9b4fe17623af1d42ba09b389dabef73d789e
Parents: 8891b1c
Author: shaofengshi <sh...@apache.org>
Authored: Tue Oct 17 22:30:42 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Oct 18 09:14:40 2017 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java   | 2 +-
 .../org/apache/kylin/measure/percentile/PercentileSerializer.java | 2 +-
 .../src/main/java/org/apache/kylin/measure/raw/RawSerializer.java | 3 +--
 3 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3efa9b4f/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
index df0cfaf..ddf8281 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
@@ -56,7 +56,7 @@ public class HLLCSerializer extends DataTypeSerializer<HLLCounter> {
 
     @Override
     public HLLCounter deserialize(ByteBuffer in) {
-        HLLCounter hllc = current();
+        HLLCounter hllc = new HLLCounter(precision);
         try {
             hllc.readRegisters(in);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/3efa9b4f/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
index d7e4204..35230a2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
@@ -68,7 +68,7 @@ public class PercentileSerializer extends DataTypeSerializer<PercentileCounter>
 
     @Override
     public PercentileCounter deserialize(ByteBuffer in) {
-        PercentileCounter counter = current();
+        PercentileCounter counter = new PercentileCounter(compression);
         counter.readRegisters(in);
         return counter;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3efa9b4f/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java
index 68a0273..ea3f376 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java
@@ -90,8 +90,7 @@ public class RawSerializer extends DataTypeSerializer<List<ByteArray>> {
 
     @Override
     public List<ByteArray> deserialize(ByteBuffer in) {
-        List<ByteArray> values = current();
-        values.clear();
+        List<ByteArray> values = new ArrayList<>();
         int size = BytesUtil.readVInt(in);
         if (size >= 0) {
             for (int i = 0; i < size; i++) {


[2/4] kylin git commit: KYLIN-2947 Changed the Pop-up box when no project selected

Posted by li...@apache.org.
KYLIN-2947 Changed the Pop-up box when no project selected


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/27ba8c75
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/27ba8c75
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/27ba8c75

Branch: refs/heads/pr77
Commit: 27ba8c75518757fb6503c64915c204cb99e9d089
Parents: 3efa9b4
Author: peng.jianhua <pe...@zte.com.cn>
Authored: Thu Oct 19 16:48:58 2017 +0800
Committer: luguosheng1314 <55...@qq.com>
Committed: Thu Oct 19 17:34:31 2017 +0800

----------------------------------------------------------------------
 webapp/app/js/controllers/admin.js | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/27ba8c75/webapp/app/js/controllers/admin.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/admin.js b/webapp/app/js/controllers/admin.js
index 0437942..bbba593 100644
--- a/webapp/app/js/controllers/admin.js
+++ b/webapp/app/js/controllers/admin.js
@@ -116,6 +116,11 @@ KylinApp.controller('AdminCtrl', function ($scope, AdminService, CacheService, T
   }
 
   $scope.calCardinality = function (tableName) {
+    var _project = ProjectModel.selectedProject;
+      if (_project == null){
+        SweetAlert.swal('', "No project selected.", 'info');
+          return;
+        }
     $modal.open({
       templateUrl: 'calCardinality.html',
       controller: CardinalityGenCtrl,


[4/4] kylin git commit: KYLIN-2722 Introduce a new measure for dropwizard metrics framework, called active reservoir, for actively pushing metrics to reporters

Posted by li...@apache.org.
KYLIN-2722 Introduce a new measure for dropwizard metrics framework, called active reservoir, for actively pushing metrics to reporters

Signed-off-by: Li Yang <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/55629865
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/55629865
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/55629865

Branch: refs/heads/pr77
Commit: 556298656362b0932c30c93ad86e1823ebcdc4f5
Parents: 27ba8c7
Author: Zhong <nj...@apache.org>
Authored: Tue Aug 8 22:50:54 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Thu Oct 19 21:39:09 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  10 +-
 core-metrics/pom.xml                            |  51 ++++
 .../kylin/metrics/lib/ActiveReservoir.java      |  40 +++
 .../metrics/lib/ActiveReservoirFilter.java      |  44 +++
 .../metrics/lib/ActiveReservoirListener.java    |  30 ++
 .../metrics/lib/ActiveReservoirReporter.java    |  51 ++++
 .../org/apache/kylin/metrics/lib/Record.java    |  51 ++++
 .../java/org/apache/kylin/metrics/lib/Sink.java |  23 ++
 .../lib/impl/AbstractActiveReservoir.java       |  68 +++++
 .../metrics/lib/impl/BaseScheduledReporter.java | 103 +++++++
 .../metrics/lib/impl/BlockingReservoir.java     | 167 ++++++++++++
 .../metrics/lib/impl/InstantReservoir.java      |  76 ++++++
 .../kylin/metrics/lib/impl/MetricsSystem.java   | 164 +++++++++++
 .../kylin/metrics/lib/impl/RecordEvent.java     | 272 +++++++++++++++++++
 .../metrics/lib/impl/RecordEventTimeDetail.java |  77 ++++++
 .../metrics/lib/impl/RecordEventWrapper.java    |  61 +++++
 .../kylin/metrics/lib/impl/ReporterBuilder.java |  48 ++++
 .../kylin/metrics/lib/impl/StubReservoir.java   |  54 ++++
 .../metrics/lib/impl/StubReservoirReporter.java |  51 ++++
 .../apache/kylin/metrics/lib/impl/StubSink.java |  30 ++
 .../metrics/lib/impl/TimePropertyEnum.java      |  49 ++++
 metrics-reporter-hive/pom.xml                   |  53 ++++
 .../metrics/lib/impl/hive/HiveProducer.java     | 201 ++++++++++++++
 .../lib/impl/hive/HiveProducerRecord.java       | 196 +++++++++++++
 .../lib/impl/hive/HiveReservoirReporter.java    | 139 ++++++++++
 .../kylin/metrics/lib/impl/hive/HiveSink.java   |  30 ++
 metrics-reporter-kafka/pom.xml                  |  46 ++++
 .../kafka/KafkaActiveReserviorListener.java     | 115 ++++++++
 .../lib/impl/kafka/KafkaReservoirReporter.java  | 139 ++++++++++
 .../kylin/metrics/lib/impl/kafka/KafkaSink.java |  29 ++
 pom.xml                                         |  25 ++
 31 files changed, 2490 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 34d8b7c..f3cf6c0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1295,9 +1295,9 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.valueOf(this.getOptional("kylin.restclient.connection.max-total", "200"));
     }
 
-    /**
-     * metric
-     */
+    // ============================================================================
+    // Metrics
+    // ============================================================================
     public String getCoadhaleMetricsReportClassesNames() {
         return getOptional("kylin.metrics.reporter-classes",
                 "org.apache.kylin.common.metrics.metrics2.JsonFileMetricsReporter,org.apache.kylin.common.metrics.metrics2.JmxMetricsReporter");
@@ -1315,4 +1315,8 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.metrics.perflogger-class", "org.apache.kylin.common.metrics.perflog.PerfLogger");
     }
 
+    public String getMetricsActiveReservoirDefaultClass() {
+        return getOptional("kylin.metrics.active-reservoir-default-class",
+                "org.apache.kylin.metrics.lib.impl.StubReservoir");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/core-metrics/pom.xml b/core-metrics/pom.xml
new file mode 100644
index 0000000..e436c97
--- /dev/null
+++ b/core-metrics/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-core-metrics</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Core Metrics</name>
+    <description>Apache Kylin - Core Metrics</description>
+
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>2.3.0-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java
new file mode 100644
index 0000000..36ab759
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+import java.io.Closeable;
+
+public interface ActiveReservoir extends Closeable {
+
+    int size();
+
+    void update(Record record);
+
+    void addListener(ActiveReservoirListener listener);
+
+    void removeListener(ActiveReservoirListener listener);
+
+    void removeAllListener();
+
+    void setHAListener(ActiveReservoirListener listener);
+
+    void start();
+
+    void stop();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
new file mode 100644
index 0000000..5cffcfc
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+/**
+ * A filter used to determine whether or not an active reservoir should be reported, among other things.
+ */
+public interface ActiveReservoirFilter {
+
+    /**
+     * Matches all active reservoirs, regardless of type or name.
+     */
+    ActiveReservoirFilter ALL = new ActiveReservoirFilter() {
+        @Override
+        public boolean matches(String name, ActiveReservoir activeReservoir) {
+            return true;
+        }
+    };
+
+    /**
+     * Returns {@code true} if the active reservoir matches the filter; {@code false} otherwise.
+     *
+     * @param name      the active reservoir's name
+     * @param activeReservoir    the active reservoir
+     * @return {@code true} if the active reservoir matches the filter
+     */
+    boolean matches(String name, ActiveReservoir activeReservoir);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java
new file mode 100644
index 0000000..f64caba
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+import java.io.Closeable;
+import java.util.EventListener;
+import java.util.List;
+
+public interface ActiveReservoirListener extends EventListener, Closeable {
+
+    boolean onRecordUpdate(final List<Record> records);
+
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
new file mode 100644
index 0000000..6020865
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+import java.io.Closeable;
+import java.util.regex.Pattern;
+
+import org.apache.kylin.common.util.Pair;
+
+import com.google.common.base.Strings;
+
+public abstract class ActiveReservoirReporter implements Closeable {
+
+    public static final String KYLIN_PREFIX = "KYLIN";
+
+    public static Pair<String, String> getTableNameSplits(String tableName) {
+        if (Strings.isNullOrEmpty(tableName)) {
+            return null;
+        }
+
+        String[] splits = tableName.split(Pattern.quote("."));
+        int i = 0;
+        String database = splits.length == 1 ? KYLIN_PREFIX : splits[i++];
+        String tableNameOnly = splits[i];
+        return new Pair(database, tableNameOnly);
+    }
+
+    public static String getTableName(Pair<String, String> tableNameSplits) {
+        return tableNameSplits.getFirst() + "." + tableNameSplits.getSecond();
+    }
+
+    public abstract void start();
+
+    public abstract void stop();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java
new file mode 100644
index 0000000..a1bce1f
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+import java.util.Map;
+
+public interface Record {
+
+    /**
+     *  For classification
+     */
+    String getType();
+
+    /**
+     *  For keep ordering in the same category
+     */
+    byte[] getKey();
+
+    /**
+     *  For the contents will be used
+     */
+    byte[] getValue();
+
+    /**
+     *  For the raw contents will be used
+     */
+    Map<String, Object> getValueRaw();
+
+    /**
+     *  For the timestamp the record created
+     */
+    Long getTime();
+
+    Record clone();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java
new file mode 100644
index 0000000..dff71bd
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+public interface Sink {
+    String getTableFromSubject(String subject);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java
new file mode 100644
index 0000000..cc72710
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.List;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+
+import com.google.common.collect.Lists;
+
+public abstract class AbstractActiveReservoir implements ActiveReservoir {
+
+    protected List<ActiveReservoirListener> listeners = Lists.newArrayList();
+
+    protected ActiveReservoirListener listenerHA = new StubReservoirReporter().listener;
+
+    protected boolean isReady = false;
+
+    public void addListener(ActiveReservoirListener listener) {
+        listeners.add(listener);
+    }
+
+    public void removeListener(ActiveReservoirListener listener) {
+        listener.close();
+        listeners.remove(listener);
+    }
+
+    public void removeAllListener() {
+        for (ActiveReservoirListener listener : listeners) {
+            listener.close();
+        }
+        listeners.clear();
+    }
+
+    public void setHAListener(ActiveReservoirListener listener) {
+        this.listenerHA = listener;
+    }
+
+    public void start() {
+        isReady = true;
+    }
+
+    public void stop() {
+        isReady = false;
+    }
+
+    public void close() {
+        stop();
+        removeAllListener();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
new file mode 100644
index 0000000..531376a
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.io.Closeable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public abstract class BaseScheduledReporter implements Closeable {
+
+    private static final Logger logger = LoggerFactory.getLogger(BaseScheduledReporter.class);
+
+    private final ScheduledExecutorService executor;
+
+    BaseScheduledReporter() {
+        this("default");
+    }
+
+    BaseScheduledReporter(String name) {
+        this(Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat("metrics-scheduler-" + name + "-%d").build()));
+    }
+
+    BaseScheduledReporter(ScheduledExecutorService executor) {
+        this.executor = executor;
+    }
+
+    public abstract void report();
+
+    /**
+     * Starts the reporter polling at the given period.
+     *
+     * @param period the amount of time between polls
+     * @param unit   the unit for {@code period}
+     */
+    public void start(long period, TimeUnit unit) {
+        executor.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    report();
+                } catch (RuntimeException ex) {
+                    logger.error("RuntimeException thrown from {}#report. Exception was suppressed.",
+                            BaseScheduledReporter.this.getClass().getSimpleName(), ex);
+                }
+            }
+        }, period, period, unit);
+    }
+
+    /**
+     * Stops the reporter and shuts down its thread of execution.
+     *
+     * Uses the shutdown pattern from http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
+     */
+    public void stop() {
+        executor.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+                executor.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+                    System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
+                }
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            executor.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Stops the reporter and shuts down its thread of execution.
+     */
+    @Override
+    public void close() {
+        stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
new file mode 100644
index 0000000..3301867
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class BlockingReservoir extends AbstractActiveReservoir {
+
+    private static final Logger logger = LoggerFactory.getLogger(BlockingReservoir.class);
+
+    private final BlockingQueue<Record> recordsQueue;
+    private final Thread scheduledReporter;
+    private final int MIN_REPORT_SIZE;
+    private final int MAX_REPORT_SIZE;
+    private final long MAX_REPORT_TIME;
+    private List<Record> records;
+
+    public BlockingReservoir() {
+        this(1, 100);
+    }
+
+    public BlockingReservoir(int minReportSize, int maxReportSize) {
+        this(minReportSize, maxReportSize, 10);
+    }
+
+    public BlockingReservoir(int minReportSize, int maxReportSize, int MAX_REPORT_TIME) {
+        this.MAX_REPORT_SIZE = maxReportSize;
+        this.MIN_REPORT_SIZE = minReportSize;
+        this.MAX_REPORT_TIME = MAX_REPORT_TIME * 60 * 1000L;
+
+        this.recordsQueue = new LinkedBlockingQueue<>();
+        this.listeners = Lists.newArrayList();
+
+        this.records = Lists.newArrayListWithExpectedSize(MAX_REPORT_SIZE);
+
+        scheduledReporter = new ThreadFactoryBuilder().setNameFormat("metrics-blocking-reservoir-scheduler-%d").build()
+                .newThread(new ReporterRunnable());
+    }
+
+    public void update(Record record) {
+        if (!isReady) {
+            logger.info("Current reservoir is not ready for update record");
+            return;
+        }
+        try {
+            recordsQueue.put(record);
+        } catch (InterruptedException e) {
+            logger.warn("Thread is interrupted during putting value to blocking queue. \n" + e.toString());
+        }
+    }
+
+    public int size() {
+        return recordsQueue.size();
+    }
+
+    private void onRecordUpdate(boolean ifAll) {
+        if (ifAll) {
+            records = Lists.newArrayList();
+            recordsQueue.drainTo(records);
+        } else {
+            records.clear();
+            recordsQueue.drainTo(records, MAX_REPORT_SIZE);
+        }
+
+        boolean ifSucceed = true;
+        for (ActiveReservoirListener listener : listeners) {
+            if (!notifyListenerOfUpdatedRecord(listener, records)) {
+                ifSucceed = false;
+                logger.warn("It fails to notify listener " + listener.toString() + " of updated records "
+                        + records.toString());
+            }
+        }
+        if (!ifSucceed) {
+            notifyListenerHAOfUpdatedRecord(records);
+        }
+    }
+
+    private boolean notifyListenerOfUpdatedRecord(ActiveReservoirListener listener, List<Record> records) {
+        return listener.onRecordUpdate(records);
+    }
+
+    private boolean notifyListenerHAOfUpdatedRecord(List<Record> records) {
+        logger.info("The HA listener " + listenerHA.toString() + " for updated records " + records.toString()
+                + " will be started");
+        if (!notifyListenerOfUpdatedRecord(listenerHA, records)) {
+            logger.error("The HA listener also fails!!!");
+            return false;
+        }
+        return true;
+    }
+
+    public void start() {
+        super.start();
+        scheduledReporter.start();
+    }
+
+    public void stop() {
+        super.stop();
+        scheduledReporter.interrupt();
+        try {
+            scheduledReporter.join();
+        } catch (InterruptedException e) {
+            logger.warn("Interrupted during join");
+            throw new RuntimeException(e);
+        }
+    }
+
+    class ReporterRunnable implements Runnable {
+
+        public void run() {
+            long startTime = System.currentTimeMillis();
+            while (isReady) {
+                if (size() <= 0) {
+                    logger.info("There's no record in the blocking queue.");
+                    sleep();
+                    startTime = System.currentTimeMillis();
+                    continue;
+                } else if (size() < MIN_REPORT_SIZE && (System.currentTimeMillis() - startTime < MAX_REPORT_TIME)) {
+                    logger.info("The number of records in the blocking queue is less than " + MIN_REPORT_SIZE + //
+                            " and the duration from last reporting is less than " + MAX_REPORT_TIME
+                            + "ms. Will delay to report!");
+                    sleep();
+                    continue;
+                }
+
+                onRecordUpdate(false);
+                startTime = System.currentTimeMillis();
+            }
+            onRecordUpdate(true);
+            logger.info("Reporter finishes reporting metrics.");
+        }
+
+        private void sleep() {
+            try {
+                Thread.sleep(60 * 1000);
+            } catch (InterruptedException e) {
+                logger.warn("Interrupted during running");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
new file mode 100644
index 0000000..41b53cf
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.List;
+
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class InstantReservoir extends AbstractActiveReservoir {
+
+    private static final Logger logger = LoggerFactory.getLogger(InstantReservoir.class);
+
+    public void update(Record record) {
+        if (!isReady) {
+            logger.info("Current reservoir is not ready for update record");
+            return;
+        }
+        onRecordUpdate(record);
+    }
+
+    public int size() {
+        return 0;
+    }
+
+    private void onRecordUpdate(Record record) {
+        boolean ifSucceed = true;
+        for (ActiveReservoirListener listener : listeners) {
+            if (!notifyListenerOfUpdatedRecord(listener, record)) {
+                ifSucceed = false;
+                logger.warn(
+                        "It fails to notify listener " + listener.toString() + " of updated record " + record.getKey());
+            }
+        }
+        if (!ifSucceed) {
+            notifyListenerHAOfUpdatedRecord(record);
+        }
+    }
+
+    private boolean notifyListenerOfUpdatedRecord(ActiveReservoirListener listener, Record record) {
+        List<Record> recordsList = Lists.newArrayList();
+        recordsList.add(record);
+        return listener.onRecordUpdate(recordsList);
+    }
+
+    private boolean notifyListenerHAOfUpdatedRecord(Record record) {
+        logger.info("The HA listener " + listenerHA.toString() + " for updated record " + record.getKey()
+                + " will be started");
+        if (!notifyListenerOfUpdatedRecord(listenerHA, record)) {
+            logger.error("The HA listener also fails!!!");
+            return false;
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
new file mode 100644
index 0000000..dc0ab66
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricRegistry;
+
+public class MetricsSystem extends MetricRegistry {
+    public static final MetricsSystem Metrics = new MetricsSystem();
+    private static final Logger logger = LoggerFactory.getLogger(MetricsSystem.class);
+    private final ConcurrentHashMap<String, ActiveReservoir> activeReservoirs;
+
+    private MetricsSystem() {
+        activeReservoirs = new ConcurrentHashMap<>();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                logger.info("Closing Metrics System");
+                try {
+                    shutdown();
+                } catch (IOException e) {
+                    logger.error("error during shutdown activeReservoirs and listeners", e);
+                }
+                logger.info("Closed Metrics System");
+            }
+        });
+    }
+
+    public void shutdown() throws IOException {
+        for (ActiveReservoir entry : activeReservoirs.values()) {
+            entry.close();
+        }
+    }
+
+    public ActiveReservoir activeReservoir(String name) {
+        return getOrAddActiveReservoir(name);
+    }
+
+    public ActiveReservoir register(String name, ActiveReservoir activeReservoir) {
+        if (name == null || activeReservoir == null) {
+            throw new IllegalArgumentException("neither of name or ActiveReservoir can be null");
+        }
+        final ActiveReservoir existingReservoir = activeReservoirs.putIfAbsent(name, activeReservoir);
+        if (existingReservoir == null) {
+            onActiveReservoirAdded(activeReservoir);
+        } else {
+            throw new IllegalArgumentException("An active reservoir named " + name + " already exists");
+        }
+
+        return activeReservoir;
+    }
+
+    /**
+     * Removes the active reservoir with the given name.
+     *
+     * @param name the name of the active reservoir
+     * @return whether or not the active reservoir was removed
+     */
+    public boolean removeActiveReservoir(String name) {
+        final ActiveReservoir recordReservoir = activeReservoirs.remove(name);
+        if (recordReservoir != null) {
+            onActiveReservoirRemoved(recordReservoir);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Removes all active reservoirs which match the given filter.
+     *
+     * @param filter a filter
+     */
+    public void removeActiveReservoirMatching(ActiveReservoirFilter filter) {
+        for (Map.Entry<String, ActiveReservoir> entry : activeReservoirs.entrySet()) {
+            if (filter.matches(entry.getKey(), entry.getValue())) {
+                removeActiveReservoir(entry.getKey());
+            }
+        }
+    }
+
+    private void onActiveReservoirAdded(ActiveReservoir activeReservoir) {
+        activeReservoir.start();
+    }
+
+    private void onActiveReservoirRemoved(ActiveReservoir activeReservoir) {
+        try {
+            activeReservoir.close();
+        } catch (IOException e) {
+        }
+    }
+
+    /**
+     * Returns a map of all the active reservoirs in the metrics system and their names.
+     *
+     * @return all the active reservoirs in the metrics system
+     */
+    public SortedMap<String, ActiveReservoir> getActiveReservoirs() {
+        return getActiveReservoirs(ActiveReservoirFilter.ALL);
+    }
+
+    /**
+     * Returns a map of all the active reservoirs in the metrics system and their names which match the given filter.
+     *
+     * @param filter    the active reservoir filter to match
+     * @return all the active reservoirs in the metrics system
+     */
+    public SortedMap<String, ActiveReservoir> getActiveReservoirs(ActiveReservoirFilter filter) {
+        final TreeMap<String, ActiveReservoir> reservoirs = new TreeMap<>();
+        for (Map.Entry<String, ActiveReservoir> entry : activeReservoirs.entrySet()) {
+            if (filter.matches(entry.getKey(), entry.getValue())) {
+                reservoirs.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return Collections.unmodifiableSortedMap(reservoirs);
+    }
+
+    private ActiveReservoir getOrAddActiveReservoir(String name) {
+        ActiveReservoir activeReservoir = activeReservoirs.get(name);
+        if (activeReservoir != null) {
+            return activeReservoir;
+        } else {
+            String defaultActiveReservoirClass = KylinConfig.getInstanceFromEnv()
+                    .getMetricsActiveReservoirDefaultClass();
+            try {
+                activeReservoir = (ActiveReservoir) Class.forName(defaultActiveReservoirClass).getConstructor()
+                        .newInstance();
+            } catch (Exception e) {
+                logger.warn(
+                        "Failed to initialize the " + defaultActiveReservoirClass + ". The StubReservoir will be used");
+                activeReservoir = new StubReservoir();
+            }
+            return register(name, activeReservoir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
new file mode 100644
index 0000000..f5bc797
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.metrics.lib.Record;
+
+import com.google.common.collect.Maps;
+
+public class RecordEvent implements Record, Map<String, Object>, Serializable {
+
+    private static final ThreadLocal<ByteArrayOutputStream> _localBaos = new ThreadLocal<ByteArrayOutputStream>();
+
+    static String localHostname;
+    static {
+        try {
+            InetAddress addr = InetAddress.getLocalHost();
+            localHostname = addr.getHostName() + ":" + addr.getHostAddress();
+        } catch (UnknownHostException e) {
+            localHostname = "Unknown";
+        }
+    }
+
+    private final Map<String, Object> backingMap;
+
+    private RecordEvent(Map<String, Object> map) {
+        this.backingMap = map;
+    }
+
+    public RecordEvent(String eventType) {
+        this(eventType, localHostname);
+    }
+
+    public RecordEvent(String eventType, long time) {
+        this(eventType, localHostname, time);
+    }
+
+    public RecordEvent(String eventType, String host) {
+        this(eventType, host, System.currentTimeMillis());
+    }
+
+    public RecordEvent(String eventType, String host, long time) {
+        this(null, eventType, host, time);
+    }
+
+    /**
+     *
+     * @param map
+     * @param eventType     mandatory   with null check
+     * @param host          mandatory   without null check
+     * @param time          mandatory   with null check
+     */
+    public RecordEvent(Map<String, Object> map, String eventType, String host, long time) {
+        backingMap = map != null ? map : Maps.<String, Object> newHashMap();
+        setEventType(eventType);
+        setHost(host);
+        setTime(time);
+    }
+
+    public String getEventType() {
+        return (String) get(RecordReserveKeyEnum.TYPE.toString());
+    }
+
+    private void setEventType(String eventType) {
+        if (eventType == null) {
+            throw new IllegalArgumentException("EventType cannot be null.");
+        }
+        put(RecordReserveKeyEnum.TYPE.toString(), eventType);
+    }
+
+    public String getHost() {
+        return (String) get(RecordReserveKeyEnum.HOST.toString());
+    }
+
+    private void setHost(String host) {
+        put(RecordReserveKeyEnum.HOST.toString(), host);
+    }
+
+    public Long getTime() {
+        return (Long) get(RecordReserveKeyEnum.TIME.toString());
+    }
+
+    private void setTime(Long time) {
+        if (time == null) {
+            throw new IllegalArgumentException("Time cannot be null.");
+        }
+        put(RecordReserveKeyEnum.TIME.toString(), time);
+    }
+
+    public void resetTime() {
+        setTime(System.currentTimeMillis());
+    }
+
+    public String getID() {
+        return (String) get(RecordReserveKeyEnum.ID.toString());
+    }
+
+    public void setID(String id) {
+        put(RecordReserveKeyEnum.ID.toString(), id);
+    }
+
+    @Override
+    public void clear() {
+        backingMap.clear();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return backingMap.containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        return backingMap.containsValue(value);
+    }
+
+    @Override
+    public Set<Entry<String, Object>> entrySet() {
+        return backingMap.entrySet();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return super.equals(o) || backingMap.equals(o);
+    }
+
+    @Override
+    public Object get(Object key) {
+        return backingMap.get(key);
+    }
+
+    @Override
+    public int hashCode() {
+        return backingMap.hashCode();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return backingMap.isEmpty();
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return backingMap.keySet();
+    }
+
+    @Override
+    public Object put(String key, Object value) {
+        return backingMap.put(key, value);
+    }
+
+    @Override
+    public void putAll(Map<? extends String, ? extends Object> t) {
+        backingMap.putAll(t);
+    }
+
+    @Override
+    public Object remove(Object key) {
+        return backingMap.remove(key);
+    }
+
+    @Override
+    public int size() {
+        return backingMap.size();
+    }
+
+    @Override
+    public String toString() {
+        return backingMap.toString();
+    }
+
+    @Override
+    public Collection<Object> values() {
+        return backingMap.values();
+    }
+
+    @Override
+    public String getType() {
+        return getEventType();
+    }
+
+    @Override
+    public byte[] getKey() {
+        return (getHost() + "-" + getTime() + "-" + getID()).getBytes();
+    }
+
+    @Override
+    /**
+     * Event type and time does not belong to value part
+     */
+    public Map<String, Object> getValueRaw() {
+        Map<String, Object> cloneMap = Maps.newHashMap(backingMap);
+        cloneMap.remove(RecordReserveKeyEnum.TYPE.toString());
+        return cloneMap;
+    }
+
+    @Override
+    /**
+     * Event type does not belong to value part, it's for classification
+     */
+    public byte[] getValue() {
+        try {
+            ByteArrayOutputStream baos = _localBaos.get();
+            if (baos == null) {
+                baos = new ByteArrayOutputStream();
+                _localBaos.set(baos);
+            }
+            baos.reset();
+            JsonUtil.writeValue(baos, getValueRaw());
+            return baos.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);//in mem, should not happen
+        }
+    }
+
+    @Override
+    public RecordEvent clone() {
+        Map<String, Object> cloneMap = Maps.newHashMap();
+        cloneMap.putAll(backingMap);
+        return new RecordEvent(cloneMap);
+    }
+
+    public enum RecordReserveKeyEnum {
+        TYPE("EVENT_TYPE"), ID("EVENT_ID"), HOST("HOST"), TIME("KTIMESTAMP");
+
+        private final String reserveKey;
+
+        private RecordReserveKeyEnum(String key) {
+            this.reserveKey = key;
+        }
+
+        @Override
+        public String toString() {
+            return reserveKey;
+        }
+
+        public RecordReserveKeyEnum getByKey(String key) {
+            for (RecordReserveKeyEnum reserveKey : RecordReserveKeyEnum.values()) {
+                if (reserveKey.reserveKey == key) {
+                    return reserveKey;
+                }
+            }
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
new file mode 100644
index 0000000..ff97b9b
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+import org.apache.kylin.common.KylinConfig;
+
+public class RecordEventTimeDetail {
+    private static final TimeZone timeZone;
+    private static final ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = new ThreadLocal<SimpleDateFormat>();
+    private static final ThreadLocal<SimpleDateFormat> timeFormatThreadLocal = new ThreadLocal<SimpleDateFormat>();
+
+    static {
+        timeZone = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone());
+    }
+
+    public final String year_begin_date;
+    public final String month_begin_date;
+    public final String date;
+    public final String time;
+    public final int hour;
+    public final int minute;
+    public final int second;
+    public final String week_begin_date;
+
+    public RecordEventTimeDetail(long timeStamp) {
+        Calendar calendar = Calendar.getInstance(timeZone);
+        calendar.setTimeInMillis(timeStamp);
+
+        SimpleDateFormat dateFormat = dateFormatThreadLocal.get();
+        if (dateFormat == null) {
+            dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+            dateFormat.setTimeZone(timeZone);
+            dateFormatThreadLocal.set(dateFormat);
+        }
+        SimpleDateFormat timeFormat = timeFormatThreadLocal.get();
+        if (timeFormat == null) {
+            timeFormat = new SimpleDateFormat("HH:mm:ss");
+            timeFormat.setTimeZone(timeZone);
+            timeFormatThreadLocal.set(timeFormat);
+        }
+
+        String yearStr = String.format("%04d", calendar.get(Calendar.YEAR));
+        String monthStr = String.format("%02d", calendar.get(Calendar.MONTH) + 1);
+        this.year_begin_date = yearStr + "-01-01";
+        this.month_begin_date = yearStr + "-" + monthStr + "-01";
+        this.date = dateFormat.format(calendar.getTime());
+        this.time = timeFormat.format(calendar.getTime());
+        this.hour = calendar.get(Calendar.HOUR_OF_DAY);
+        this.minute = calendar.get(Calendar.MINUTE);
+        this.second = calendar.get(Calendar.SECOND);
+
+        long timeStampForWeekBegin = timeStamp;
+        timeStampForWeekBegin -= 3600000 * 24 * (calendar.get(Calendar.DAY_OF_WEEK) - 1);
+        calendar.setTimeInMillis(timeStampForWeekBegin);
+        this.week_begin_date = dateFormat.format(calendar.getTime());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
new file mode 100644
index 0000000..7031129
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.io.Serializable;
+
+import org.apache.kylin.metrics.lib.Record;
+
+public class RecordEventWrapper implements Serializable {
+
+    protected final RecordEvent metricsEvent;
+
+    public RecordEventWrapper(RecordEvent metricsEvent) {
+        this.metricsEvent = metricsEvent;
+
+        //Add time details
+        addTimeDetails();
+    }
+
+    private void addTimeDetails() {
+        RecordEventTimeDetail dateDetail = new RecordEventTimeDetail(metricsEvent.getTime());
+        metricsEvent.put(TimePropertyEnum.YEAR.toString(), dateDetail.year_begin_date);
+        metricsEvent.put(TimePropertyEnum.MONTH.toString(), dateDetail.month_begin_date);
+        metricsEvent.put(TimePropertyEnum.WEEK_BEGIN_DATE.toString(), dateDetail.week_begin_date);
+        metricsEvent.put(TimePropertyEnum.DAY_DATE.toString(), dateDetail.date);
+        metricsEvent.put(TimePropertyEnum.DAY_TIME.toString(), dateDetail.time);
+        metricsEvent.put(TimePropertyEnum.TIME_HOUR.toString(), dateDetail.hour);
+        metricsEvent.put(TimePropertyEnum.TIME_MINUTE.toString(), dateDetail.minute);
+        metricsEvent.put(TimePropertyEnum.TIME_SECOND.toString(), dateDetail.second);
+    }
+
+    public void resetTime() {
+        metricsEvent.resetTime();
+        addTimeDetails();
+    }
+
+    public Record getMetricsRecord() {
+        return metricsEvent;
+    }
+
+    @Override
+    public String toString() {
+        return metricsEvent.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
new file mode 100644
index 0000000..22fadd3
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.Properties;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+
+public abstract class ReporterBuilder {
+    protected final ActiveReservoir registry;
+    protected final Properties props;
+
+    protected ReporterBuilder(ActiveReservoir activeReservoir) {
+        this.registry = activeReservoir;
+        this.props = new Properties();
+    }
+
+    public ReporterBuilder setConfig(Properties props) {
+        if (props != null) {
+            this.props.putAll(props);
+        }
+        return this;
+    }
+
+    /**
+     * Builds a {@link ActiveReservoirReporter} with the given properties.
+     *
+     * @return a {@link ActiveReservoirReporter}
+     */
+    public abstract ActiveReservoirReporter build() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java
new file mode 100644
index 0000000..fe69dec
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+
+public class StubReservoir implements ActiveReservoir {
+
+    public void addListener(ActiveReservoirListener listener) {
+    }
+
+    public void removeListener(ActiveReservoirListener listener) {
+    }
+
+    public void removeAllListener() {
+    }
+
+    public void setHAListener(ActiveReservoirListener listener) {
+    }
+
+    public void update(Record record) {
+    }
+
+    public int size() {
+        return 0;
+    }
+
+    public void start() {
+    }
+
+    public void stop() {
+    }
+
+    public void close() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java
new file mode 100644
index 0000000..5e0e637
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.List;
+
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+
+public class StubReservoirReporter extends ActiveReservoirReporter {
+
+    public static final String STUB_REPORTER_SUFFIX = "STUB";
+
+    public final StubReservoirListener listener = new StubReservoirListener();
+
+    public void start() {
+    }
+
+    public void stop() {
+    }
+
+    public void close() {
+    }
+
+    private class StubReservoirListener implements ActiveReservoirListener {
+
+        public boolean onRecordUpdate(final List<Record> records) {
+            return true;
+        }
+
+        public void close() {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java
new file mode 100644
index 0000000..676c59c
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import static org.apache.kylin.metrics.lib.ActiveReservoirReporter.KYLIN_PREFIX;
+import static org.apache.kylin.metrics.lib.impl.StubReservoirReporter.STUB_REPORTER_SUFFIX;
+
+import org.apache.kylin.metrics.lib.Sink;
+
+public class StubSink implements Sink {
+    public String getTableFromSubject(String subject) {
+        return KYLIN_PREFIX + "." + STUB_REPORTER_SUFFIX + "_" + subject;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
new file mode 100644
index 0000000..1336843
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import com.google.common.base.Strings;
+
+public enum TimePropertyEnum {
+    YEAR("KYEAR_BEGIN_DATE"), MONTH("KMONTH_BEGIN_DATE"), WEEK_BEGIN_DATE("KWEEK_BEGIN_DATE"), DAY_DATE(
+            "KDAY_DATE"), DAY_TIME(
+                    "KDAY_TIME"), TIME_HOUR("KTIME_HOUR"), TIME_MINUTE("KTIME_MINUTE"), TIME_SECOND("KTIME_SECOND");
+
+    private final String propertyName;
+
+    TimePropertyEnum(String propertyName) {
+        this.propertyName = propertyName;
+    }
+
+    public static TimePropertyEnum getByPropertyName(String propertyName) {
+        if (Strings.isNullOrEmpty(propertyName)) {
+            return null;
+        }
+        for (TimePropertyEnum property : TimePropertyEnum.values()) {
+            if (property.propertyName.equals(propertyName.toUpperCase())) {
+                return property;
+            }
+        }
+        return null;
+    }
+
+    public String toString() {
+        return propertyName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-hive/pom.xml
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/pom.xml b/metrics-reporter-hive/pom.xml
new file mode 100644
index 0000000..e25636e
--- /dev/null
+++ b/metrics-reporter-hive/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-metrics-reporter-hive</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Metrics Reporter Hive</name>
+    <description>Apache Kylin - Metrics Reporter Hive</description>
+
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>2.3.0-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-hcatalog-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
new file mode 100644
index 0000000..26a81e3
--- /dev/null
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
+import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class HiveProducer {
+
+    private static final Logger logger = LoggerFactory.getLogger(HiveProducer.class);
+
+    private static final int CACHE_MAX_SIZE = 10;
+
+    private final HiveConf hiveConf;
+    private final FileSystem hdfs;
+    private final LoadingCache<Pair<String, String>, Pair<String, List<FieldSchema>>> tableFieldSchemaCache;
+    private final String CONTENT_FILE_NAME;
+
+    public HiveProducer(Properties props) throws Exception {
+        this(props, new HiveConf());
+    }
+
+    HiveProducer(Properties props, HiveConf hiveConfig) throws Exception {
+        hiveConf = hiveConfig;
+        for (Map.Entry<Object, Object> e : props.entrySet()) {
+            hiveConf.set(e.getKey().toString(), e.getValue().toString());
+        }
+
+        hdfs = FileSystem.get(hiveConf);
+
+        tableFieldSchemaCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<Pair<String, String>, Pair<String, List<FieldSchema>>>() {
+            @Override
+            public void onRemoval(RemovalNotification<Pair<String, String>, Pair<String, List<FieldSchema>>> notification) {
+                logger.info("Field schema with table " + ActiveReservoirReporter.getTableName(notification.getKey()) + " is removed due to " + notification.getCause());
+            }
+        }).maximumSize(CACHE_MAX_SIZE).build(new CacheLoader<Pair<String, String>, Pair<String, List<FieldSchema>>>() {
+            @Override
+            public Pair<String, List<FieldSchema>> load(Pair<String, String> tableName) throws Exception {
+                HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf);
+                String tableLocation = metaStoreClient.getTable(tableName.getFirst(), tableName.getSecond()).getSd().getLocation();
+                List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(), tableName.getSecond());
+                metaStoreClient.close();
+                return new Pair(tableLocation, fields);
+            }
+        });
+
+        String hostName;
+        try {
+            hostName = InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+            hostName = "UNKNOWN";
+        }
+        CONTENT_FILE_NAME = hostName + "-part-0000";
+    }
+
+    public void close() {
+        tableFieldSchemaCache.cleanUp();
+    }
+
+    public void send(final Record record) throws Exception {
+        HiveProducerRecord hiveRecord = convertTo(record);
+        write(hiveRecord.key(), Lists.newArrayList(hiveRecord));
+    }
+
+    public void send(final List<Record> recordList) throws Exception {
+        Map<RecordKey, List<HiveProducerRecord>> recordMap = Maps.newHashMap();
+        for (Record record : recordList) {
+            HiveProducerRecord hiveRecord = convertTo(record);
+            if (recordMap.get(hiveRecord.key()) == null) {
+                recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> newLinkedList());
+            }
+            recordMap.get(hiveRecord.key()).add(hiveRecord);
+        }
+
+        for (Map.Entry<RecordKey, List<HiveProducerRecord>> entry : recordMap.entrySet()) {
+            write(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private void write(RecordKey recordKey, Iterable<HiveProducerRecord> recordItr) throws Exception {
+        String tableLocation = tableFieldSchemaCache.get(new Pair<>(recordKey.database(), recordKey.table())).getFirst();
+        StringBuilder sb = new StringBuilder();
+        sb.append(tableLocation);
+        for (Map.Entry<String, String> e : recordKey.partition().entrySet()) {
+            sb.append("/");
+            sb.append(e.getKey().toLowerCase());
+            sb.append("=");
+            sb.append(e.getValue());
+        }
+        Path partitionPath = new Path(sb.toString());
+        if (!hdfs.exists(partitionPath)) {
+            StringBuilder hql = new StringBuilder();
+            hql.append("ALTER TABLE ");
+            hql.append(recordKey.database() + "." + recordKey.table());
+            hql.append(" ADD IF NOT EXISTS PARTITION (");
+            boolean ifFirst = true;
+            for (Map.Entry<String, String> e : recordKey.partition().entrySet()) {
+                if (ifFirst) {
+                    ifFirst = false;
+                } else {
+                    hql.append(",");
+                }
+                hql.append(e.getKey().toLowerCase());
+                hql.append("='" + e.getValue() + "'");
+            }
+            hql.append(")");
+            Driver driver = new Driver(hiveConf);
+            SessionState.start(new CliSessionState(hiveConf));
+            driver.run(hql.toString());
+            driver.close();
+        }
+        Path partitionContentPath = new Path(partitionPath, CONTENT_FILE_NAME);
+        if (!hdfs.exists(partitionContentPath)) {
+            int nRetry = 0;
+            while (!hdfs.createNewFile(partitionContentPath) && nRetry++ < 5) {
+                if (hdfs.exists(partitionContentPath)) {
+                    break;
+                }
+                Thread.sleep(500L * nRetry);
+            }
+            if (!hdfs.exists(partitionContentPath)) {
+                throw new RuntimeException("Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries");
+            }
+        }
+        try (FSDataOutputStream fout = hdfs.append(partitionContentPath)) {
+            for (HiveProducerRecord elem : recordItr) {
+                fout.writeBytes(elem.valueToString() + "\n");
+            }
+        } catch (IOException e) {
+            System.out.println("Fails to write metrics to file " + partitionContentPath.toString() + " due to " + e);
+            logger.error("Fails to write metrics to file " + partitionContentPath.toString() + " due to " + e);
+        }
+    }
+
+    private HiveProducerRecord convertTo(Record record) throws Exception {
+        Map<String, Object> rawValue = record.getValueRaw();
+
+        //Set partition values for hive table
+        Map<String, String> partitionKVs = Maps.newHashMapWithExpectedSize(1);
+        partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), rawValue.get(TimePropertyEnum.DAY_DATE.toString()).toString());
+
+        return parseToHiveProducerRecord(HiveReservoirReporter.getTableFromSubject(record.getType()), partitionKVs, rawValue);
+    }
+
+    public HiveProducerRecord parseToHiveProducerRecord(String tableName, Map<String, String> partitionKVs, Map<String, Object> rawValue) throws Exception {
+        Pair<String, String> tableNameSplits = ActiveReservoirReporter.getTableNameSplits(tableName);
+        List<FieldSchema> fields = tableFieldSchemaCache.get(tableNameSplits).getSecond();
+        List<Object> columnValues = Lists.newArrayListWithExpectedSize(fields.size());
+        for (FieldSchema fieldSchema : fields) {
+            columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase()));
+        }
+
+        return new HiveProducerRecord(tableNameSplits.getFirst(), tableNameSplits.getSecond(), partitionKVs, columnValues);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
new file mode 100644
index 0000000..8bf93ec
--- /dev/null
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Strings;
+
+public class HiveProducerRecord {
+
+    public static final String DELIMITER = ",";
+
+    private final RecordKey key;
+    private final List<Object> value;
+
+    public HiveProducerRecord(String dbName, String tableName, Map<String, String> partitionKVs, List<Object> value) {
+        this.key = new RecordKey(dbName, tableName, partitionKVs);
+        this.value = value;
+    }
+
+    public HiveProducerRecord(String tableName, Map<String, String> partitionKVs, List<Object> value) {
+        this.key = new RecordKey(tableName, partitionKVs);
+        this.value = value;
+    }
+
+    public HiveProducerRecord(String dbName, String tableName, List<Object> value) {
+        this.key = new RecordKey(dbName, tableName);
+        this.value = value;
+    }
+
+    public HiveProducerRecord(String tableName, List<Object> value) {
+        this.key = new RecordKey(tableName);
+        this.value = value;
+    }
+
+    public RecordKey key() {
+        return this.key;
+    }
+
+    public List<Object> value() {
+        return this.value;
+    }
+
+    public String toString() {
+        String value = this.value == null ? "null" : this.value.toString();
+        return "HiveProducerRecord(key=" + this.key.toString() + ", value=" + value + ")";
+    }
+
+    public String valueToString() {
+        if (this.value == null || value.isEmpty()) {
+            return null;
+        }
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < value.size() - 1; i++) {
+            sb.append(value.get(i) + DELIMITER);
+        }
+        sb.append(value.get(value.size() - 1));
+        return sb.toString();
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        } else if (!(o instanceof HiveProducerRecord)) {
+            return false;
+        } else {
+            HiveProducerRecord that = (HiveProducerRecord) o;
+            if (this.key != null) {
+                if (!this.key.equals(that.key)) {
+                    return false;
+                }
+            } else if (that.key != null) {
+                return false;
+            }
+            if (this.value != null) {
+                if (!this.value.equals(that.value)) {
+                    return false;
+                }
+            } else if (that.value != null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public int hashCode() {
+        int result = this.key != null ? this.key.hashCode() : 0;
+        result = 31 * result + (this.value != null ? this.value.hashCode() : 0);
+        return result;
+    }
+
+    public class RecordKey {
+        public static final String DEFAULT_DB_NAME = "DEFAULT";
+
+        private final String dbName;
+        private final String tableName;
+        private final Map<String, String> partitionKVs;
+
+        public RecordKey(String dbName, String tableName, Map<String, String> partitionKVs) {
+            if (Strings.isNullOrEmpty(dbName)) {
+                this.dbName = DEFAULT_DB_NAME;
+            } else {
+                this.dbName = dbName;
+            }
+            this.tableName = tableName;
+            this.partitionKVs = partitionKVs;
+        }
+
+        public RecordKey(String tableName, Map<String, String> partitionKVs) {
+            this(null, tableName, partitionKVs);
+        }
+
+        public RecordKey(String dbName, String tableName) {
+            this(dbName, tableName, null);
+        }
+
+        public RecordKey(String tableName) {
+            this(null, tableName, null);
+        }
+
+        public String database() {
+            return this.dbName;
+        }
+
+        public String table() {
+            return this.tableName;
+        }
+
+        public Map<String, String> partition() {
+            return this.partitionKVs;
+        }
+
+        public String toString() {
+            String partitionKVs = this.partitionKVs == null ? "null" : this.partitionKVs.toString();
+            return "RecordKey(database=" + this.dbName + ", table=" + this.tableName + ", partition=" + partitionKVs + ")";
+        }
+
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            } else if (!(o instanceof RecordKey)) {
+                return false;
+            } else {
+                RecordKey that = (RecordKey) o;
+                if (this.dbName != null) {
+                    if (!this.dbName.equals(that.dbName)) {
+                        return false;
+                    }
+                } else if (that.dbName != null) {
+                    return false;
+                }
+
+                if (this.tableName != null) {
+                    if (!this.tableName.equals(that.tableName)) {
+                        return false;
+                    }
+                } else if (that.tableName != null) {
+                    return false;
+                }
+
+                if (this.partitionKVs != null) {
+                    if (!this.partitionKVs.equals(that.partitionKVs)) {
+                        return false;
+                    }
+                } else if (that.partitionKVs != null) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        public int hashCode() {
+            int result = this.dbName != null ? this.dbName.hashCode() : 0;
+            result = 31 * result + (this.tableName != null ? this.tableName.hashCode() : 0);
+            result = 31 * result + (this.partitionKVs != null ? this.partitionKVs.hashCode() : 0);
+            return result;
+        }
+    }
+}


[3/4] kylin git commit: KYLIN-2722 Introduce a new measure for dropwizard metrics framework, called active reservoir, for actively pushing metrics to reporters

Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
new file mode 100644
index 0000000..5af2bf9
--- /dev/null
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.ReporterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reporter which listens for new records and publishes them to hive.
+ */
+public class HiveReservoirReporter extends ActiveReservoirReporter {
+
+    public static final String HIVE_REPORTER_SUFFIX = "HIVE";
+    public static final HiveSink sink = new HiveSink();
+    protected static final Logger logger = LoggerFactory.getLogger(HiveReservoirReporter.class);
+    private final ActiveReservoir activeReservoir;
+    private final HiveReservoirListener listener;
+
+    public HiveReservoirReporter(ActiveReservoir activeReservoir, Properties props) throws Exception {
+        this.activeReservoir = activeReservoir;
+        this.listener = new HiveReservoirListener(props);
+    }
+
+    /**
+     * Returns a new {@link Builder} for {@link HiveReservoirReporter}.
+     *
+     * @param activeReservoir the registry to report
+     * @return a {@link Builder} instance for a {@link HiveReservoirReporter}
+     */
+    public static Builder forRegistry(ActiveReservoir activeReservoir) {
+        return new Builder(activeReservoir);
+    }
+
+    public static String getTableFromSubject(String subject) {
+        return sink.getTableFromSubject(subject);
+    }
+
+    /**
+     * Starts the reporter.
+     */
+    public void start() {
+        activeReservoir.addListener(listener);
+    }
+
+    /**
+     * Stops the reporter.
+     */
+    public void stop() {
+        activeReservoir.removeListener(listener);
+    }
+
+    /**
+     * Stops the reporter.
+     */
+    @Override
+    public void close() {
+        stop();
+    }
+
+    /**
+     * A builder for {@link HiveReservoirReporter} instances.
+     */
+    public static class Builder extends ReporterBuilder {
+
+        private Builder(ActiveReservoir activeReservoir) {
+            super(activeReservoir);
+        }
+
+        private void setFixedProperties() {
+        }
+
+        /**
+         * Builds a {@link HiveReservoirReporter} with the given properties.
+         *
+         * @return a {@link HiveReservoirReporter}
+         */
+        public HiveReservoirReporter build() throws Exception {
+            setFixedProperties();
+            return new HiveReservoirReporter(registry, props);
+        }
+    }
+
+    private class HiveReservoirListener implements ActiveReservoirListener {
+
+        HiveProducer producer;
+
+        private HiveReservoirListener(Properties props) throws Exception {
+            producer = new HiveProducer(props);
+        }
+
+        public boolean onRecordUpdate(final List<Record> records) {
+            try {
+                producer.send(records);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+                return false;
+            }
+            return true;
+        }
+
+        public boolean onRecordUpdate(final Record record) {
+            try {
+                producer.send(record);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+                return false;
+            }
+            return true;
+        }
+
+        public void close() {
+            producer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java
new file mode 100644
index 0000000..3b0eefe
--- /dev/null
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import static org.apache.kylin.metrics.lib.ActiveReservoirReporter.KYLIN_PREFIX;
+import static org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter.HIVE_REPORTER_SUFFIX;
+
+import org.apache.kylin.metrics.lib.Sink;
+
+public class HiveSink implements Sink {
+    public String getTableFromSubject(String subject) {
+        return KYLIN_PREFIX + "." + HIVE_REPORTER_SUFFIX + "_" + subject;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml
new file mode 100644
index 0000000..ae9fb88
--- /dev/null
+++ b/metrics-reporter-kafka/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-metrics-reporter-kafka</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Metrics Reporter Kafka</name>
+    <description>Apache Kylin - Metrics Reporter Kafka</description>
+
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>2.3.0-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
new file mode 100644
index 0000000..311f3e3
--- /dev/null
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.kafka;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class KafkaActiveReserviorListener implements ActiveReservoirListener {
+
+    public static final long TOPIC_AVAILABLE_TAG = 0L;
+    protected static final Logger logger = LoggerFactory.getLogger(KafkaActiveReserviorListener.class);
+    protected Long maxBlockMs = 1800000L;
+    protected int maxRecordForLogNum = 10000;
+    protected int maxRecordSkipForLogNum = 10000;
+    protected ConcurrentHashMap<String, Long> topicsIfAvailable = new ConcurrentHashMap<>();
+    private int nRecord = 0;
+    private int nRecordSkip = 0;
+    private Callback produceCallback = new Callback() {
+        @Override
+        public void onCompletion(RecordMetadata metadata, Exception exception) {
+            if (exception != null) {
+                exception.printStackTrace();
+                return;
+            }
+            logger.info("topic:" + metadata.topic() + "; partition: " + metadata.partition() + "; offset: " + metadata.offset());
+        }
+    };
+
+    protected abstract String decorateTopic(String topic);
+
+    protected abstract void tryFetchMetadataFor(String topic);
+
+    protected abstract void send(String topic, Record record, Callback callback);
+
+    protected void sendWrapper(String topic, Record record, Callback callback) {
+        try {
+            send(topic, record, callback);
+        } catch (org.apache.kafka.common.errors.TimeoutException e) {
+            setUnAvailable(topic);
+            throw e;
+        }
+    }
+
+    public boolean onRecordUpdate(final List<Record> records) {
+        try {
+            for (Record record : records) {
+                String topic = decorateTopic(record.getType());
+                if (!checkAvailable(topic)) {
+                    if (nRecordSkip % maxRecordSkipForLogNum == 0) {
+                        nRecordSkip = 0;
+                        logger.warn("Skip to send record to topic " + topic);
+                    }
+                    nRecordSkip++;
+                    continue;
+                }
+                if (nRecord % maxRecordForLogNum == 0) {
+                    nRecord = 0;
+                    sendWrapper(topic, record, produceCallback);
+                } else {
+                    sendWrapper(topic, record, null);
+                }
+                nRecord++;
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+        return true;
+    }
+
+    protected boolean checkAvailable(String topic) {
+        Long timeBlock = topicsIfAvailable.get(topic);
+        if (timeBlock != null && timeBlock == TOPIC_AVAILABLE_TAG) {
+            return true;
+        } else if (timeBlock == null || System.currentTimeMillis() - timeBlock > maxBlockMs) {
+            try {
+                tryFetchMetadataFor(topic);
+                topicsIfAvailable.put(topic, TOPIC_AVAILABLE_TAG);
+                return true;
+            } catch (org.apache.kafka.common.errors.TimeoutException e) {
+                logger.warn("Fail to fetch metadata for topic " + topic);
+                setUnAvailable(topic);
+                return false;
+            }
+        }
+        return false;
+    }
+
+    protected void setUnAvailable(String topic) {
+        topicsIfAvailable.put(topic, System.currentTimeMillis());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
new file mode 100644
index 0000000..a5ea3aa
--- /dev/null
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.kafka;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.ReporterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reporter which listens for new records and publishes them to Kafka.
+ */
+public class KafkaReservoirReporter extends ActiveReservoirReporter {
+
+    public static final String KAFKA_REPORTER_SUFFIX = "KAFKA";
+    public static final KafkaSink sink = new KafkaSink();
+    protected static final Logger logger = LoggerFactory.getLogger(KafkaReservoirReporter.class);
+    private final ActiveReservoir activeReservoir;
+    private final KafkaReservoirListener listener;
+
+    private KafkaReservoirReporter(ActiveReservoir activeReservoir, Properties props) {
+        this.activeReservoir = activeReservoir;
+        this.listener = new KafkaReservoirListener(props);
+    }
+
+    /**
+     * Returns a new {@link Builder} for {@link KafkaReservoirReporter}.
+     *
+     * @param activeReservoir the registry to report
+     * @return a {@link Builder} instance for a {@link KafkaReservoirReporter}
+     */
+    public static Builder forRegistry(ActiveReservoir activeReservoir) {
+        return new Builder(activeReservoir);
+    }
+
+    private static String decorateTopic(String topic) {
+        return ActiveReservoirReporter.KYLIN_PREFIX + "_" + KAFKA_REPORTER_SUFFIX + "_" + topic;
+    }
+
+    public static String getTableFromSubject(String subject) {
+        return sink.getTableFromSubject(subject);
+    }
+
+    /**
+     * Starts the reporter.
+     */
+    public void start() {
+        activeReservoir.addListener(listener);
+    }
+
+    /**
+     * Stops the reporter.
+     */
+    public void stop() {
+        activeReservoir.removeListener(listener);
+    }
+
+    /**
+     * Stops the reporter.
+     */
+    @Override
+    public void close() {
+        stop();
+    }
+
+    /**
+     * A builder for {@link KafkaReservoirReporter} instances.
+     */
+    public static class Builder extends ReporterBuilder {
+
+        private Builder(ActiveReservoir activeReservoir) {
+            super(activeReservoir);
+        }
+
+        private void setFixedProperties() {
+            props.put("key.serializer", ByteArraySerializer.class.getName());
+            props.put("value.serializer", ByteArraySerializer.class.getName());
+        }
+
+        /**
+         * Builds a {@link KafkaReservoirReporter} with the given properties.
+         *
+         * @return a {@link KafkaReservoirReporter}
+         */
+        public KafkaReservoirReporter build() {
+            setFixedProperties();
+            return new KafkaReservoirReporter(registry, props);
+        }
+    }
+
+    private class KafkaReservoirListener extends KafkaActiveReserviorListener {
+        protected final Producer<byte[], byte[]> producer;
+
+        private KafkaReservoirListener(Properties props) {
+            producer = new KafkaProducer<>(props);
+        }
+
+        public void tryFetchMetadataFor(String topic) {
+            producer.partitionsFor(topic);
+        }
+
+        protected String decorateTopic(String topic) {
+            return KafkaReservoirReporter.decorateTopic(topic);
+        }
+
+        protected void send(String topic, Record record, Callback callback) {
+            producer.send(new ProducerRecord<>(topic, record.getKey(), record.getValue()), callback);
+        }
+
+        public void close() {
+            producer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java
new file mode 100644
index 0000000..f756b8a
--- /dev/null
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.kafka;
+
+import static org.apache.kylin.metrics.lib.ActiveReservoirReporter.KYLIN_PREFIX;
+
+import org.apache.kylin.metrics.lib.Sink;
+
+public class KafkaSink implements Sink {
+    public String getTableFromSubject(String subject) {
+        return KYLIN_PREFIX + "." + KafkaReservoirReporter.KAFKA_REPORTER_SUFFIX + "_" + subject;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index de5da05..ffbdf03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -209,6 +209,21 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-core-metrics</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-metrics-reporter-hive</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-metrics-reporter-kafka</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
                 <artifactId>kylin-core-metadata</artifactId>
                 <version>${project.version}</version>
             </dependency>
@@ -650,6 +665,13 @@
                 <version>${slf4j.version}</version>
             </dependency>
 
+            <!-- Metrics -->
+            <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-core</artifactId>
+                <version>${dropwizard.version}</version>
+            </dependency>
+
             <!-- Test -->
             <dependency>
                 <groupId>junit</groupId>
@@ -1126,6 +1148,9 @@
         <module>tool-assembly</module>
         <module>kylin-it</module>
         <module>tomcat-ext</module>
+        <module>core-metrics</module>
+        <module>metrics-reporter-hive</module>
+        <module>metrics-reporter-kafka</module>
     </modules>
 
     <reporting>