You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/10/13 05:39:38 UTC

[kylin] branch master-hadoop3 updated: Revert "KYLIN-4508 Add unit test for core-metrics module & reporters"

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master-hadoop3
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master-hadoop3 by this push:
     new d8af2c8  Revert "KYLIN-4508 Add unit test for core-metrics module & reporters"
d8af2c8 is described below

commit d8af2c8385535f804009bbbca1f618918de71702
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Tue Oct 13 13:37:01 2020 +0800

    Revert "KYLIN-4508 Add unit test for core-metrics module & reporters"
    
    This reverts commit 8d894e2c8098a41d3778933cc93484a2c5f582e3.
---
 .../kylin/metrics/lib/impl/TimedRecordEvent.java   |  32 ----
 metrics-reporter-hive/pom.xml                      |  25 ----
 .../kylin/metrics/lib/impl/hive/HiveProducer.java  |   7 +-
 .../metrics/lib/impl/hive/HiveProducerRecord.java  | 141 +++++++++++-------
 .../lib/impl/hive/HiveReservoirReporter.java       |  39 ++---
 .../lib/impl/hive/HiveProducerRecordTest.java      |  81 -----------
 .../metrics/lib/impl/hive/HiveProducerTest.java    | 161 ---------------------
 .../lib/impl/hive/HiveReservoirReporterTest.java   |  88 -----------
 metrics-reporter-kafka/pom.xml                     |  19 ---
 .../impl/kafka/KafkaActiveReserviorListener.java   |   8 -
 .../lib/impl/kafka/KafkaReservoirReporter.java     |   6 +-
 .../lib/impl/kafka/KafkaReservoirReporterTest.java |  79 ----------
 .../tool/metrics/systemcube/HiveTableCreator.java  |   3 +-
 13 files changed, 104 insertions(+), 585 deletions(-)

diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
index 984d5f5..a866163 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
@@ -44,36 +44,4 @@ public class TimedRecordEvent extends RecordEvent {
         super.resetTime();
         addTimeDetails();
     }
-
-    public String getYear() {
-        return (String) get(TimePropertyEnum.YEAR.toString());
-    }
-
-    public String getMonth() {
-        return (String) get(TimePropertyEnum.MONTH.toString());
-    }
-
-    public String getWeekBeginDate() {
-        return (String) get(TimePropertyEnum.WEEK_BEGIN_DATE.toString());
-    }
-
-    public String getDayDate() {
-        return (String) get(TimePropertyEnum.DAY_DATE.toString());
-    }
-
-    public String getDayTime() {
-        return (String) get(TimePropertyEnum.DAY_TIME.toString());
-    }
-
-    public int getTimeHour() {
-        return (int) get(TimePropertyEnum.TIME_HOUR.toString());
-    }
-
-    public int getTimeMinute() {
-        return (int) get(TimePropertyEnum.TIME_MINUTE.toString());
-    }
-
-    public int getTimeSecond() {
-        return (int) get(TimePropertyEnum.TIME_SECOND.toString());
-    }
 }
diff --git a/metrics-reporter-hive/pom.xml b/metrics-reporter-hive/pom.xml
index 87b7808..0159804 100644
--- a/metrics-reporter-hive/pom.xml
+++ b/metrics-reporter-hive/pom.xml
@@ -56,30 +56,5 @@
             <artifactId>hadoop-hdfs</artifactId>
             <scope>provided</scope>
         </dependency>
-
-        <!-- Env & Test -->
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-api-mockito</artifactId>
-            <version>${powermock.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-module-junit4-rule-agent</artifactId>
-            <version>${powermock.version}</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index e79010c..8bc7a43 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -309,7 +309,7 @@ public class HiveProducer {
         fout = null;
     }
 
-    HiveProducerRecord convertTo(Record record) throws Exception {
+    private HiveProducerRecord convertTo(Record record) throws Exception {
         Map<String, Object> rawValue = record.getValueRaw();
 
         //Set partition values for hive table
@@ -330,8 +330,7 @@ public class HiveProducer {
             columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase(Locale.ROOT)));
         }
 
-        HiveProducerRecord.RecordKey key = new HiveProducerRecord.KeyBuilder(tableNameSplits.getSecond())
-                .setDbName(tableNameSplits.getFirst()).setPartitionKVs(partitionKVs).build();
-        return new HiveProducerRecord(key, columnValues);
+        return new HiveProducerRecord(tableNameSplits.getFirst(), tableNameSplits.getSecond(), partitionKVs,
+                columnValues);
     }
 }
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
index fa5222f..650d18a 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
@@ -30,8 +30,23 @@ public class HiveProducerRecord {
     private final RecordKey key;
     private final List<Object> value;
 
-    public HiveProducerRecord(RecordKey key, List<Object> value) {
-        this.key = key;
+    public HiveProducerRecord(String dbName, String tableName, Map<String, String> partitionKVs, List<Object> value) {
+        this.key = new RecordKey(dbName, tableName, partitionKVs);
+        this.value = value;
+    }
+
+    public HiveProducerRecord(String tableName, Map<String, String> partitionKVs, List<Object> value) {
+        this.key = new RecordKey(tableName, partitionKVs);
+        this.value = value;
+    }
+
+    public HiveProducerRecord(String dbName, String tableName, List<Object> value) {
+        this.key = new RecordKey(dbName, tableName);
+        this.value = value;
+    }
+
+    public HiveProducerRecord(String tableName, List<Object> value) {
+        this.key = new RecordKey(tableName);
         this.value = value;
     }
 
@@ -60,55 +75,41 @@ public class HiveProducerRecord {
         return sb.toString();
     }
 
-    @Override
     public boolean equals(Object o) {
-        if (this == o)
+        if (this == o) {
             return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        HiveProducerRecord record = (HiveProducerRecord) o;
-
-        if (key != null ? !key.equals(record.key) : record.key != null)
+        } else if (!(o instanceof HiveProducerRecord)) {
             return false;
-        return value != null ? value.equals(record.value) : record.value == null;
+        } else {
+            HiveProducerRecord that = (HiveProducerRecord) o;
+            if (this.key != null) {
+                if (!this.key.equals(that.key)) {
+                    return false;
+                }
+            } else if (that.key != null) {
+                return false;
+            }
+            if (this.value != null) {
+                if (!this.value.equals(that.value)) {
+                    return false;
+                }
+            } else if (that.value != null) {
+                return false;
+            }
+        }
+        return true;
     }
 
-    @Override
     public int hashCode() {
-        int result = key != null ? key.hashCode() : 0;
-        result = 31 * result + (value != null ? value.hashCode() : 0);
+        int result = this.key != null ? this.key.hashCode() : 0;
+        result = 31 * result + (this.value != null ? this.value.hashCode() : 0);
         return result;
     }
 
-    public static class KeyBuilder {
-        private final String tableName;
-        private String dbName;
-        private Map<String, String> partitionKVs;
-
-        public KeyBuilder(String tableName) {
-            this.tableName = tableName;
-        }
-
-        public KeyBuilder setDbName(String dbName) {
-            this.dbName = dbName;
-            return this;
-        }
-
-        public KeyBuilder setPartitionKVs(Map<String, String> partitionKVs) {
-            this.partitionKVs = partitionKVs;
-            return this;
-        }
-
-        public RecordKey build() {
-            return new RecordKey(dbName, tableName, partitionKVs);
-        }
-    }
-
     /**
      * Use to organize metrics message
      */
-    public static class RecordKey {
+    public class RecordKey {
         public static final String DEFAULT_DB_NAME = "DEFAULT";
 
         private final String dbName;
@@ -125,6 +126,18 @@ public class HiveProducerRecord {
             this.partitionKVs = partitionKVs;
         }
 
+        public RecordKey(String tableName, Map<String, String> partitionKVs) {
+            this(null, tableName, partitionKVs);
+        }
+
+        public RecordKey(String dbName, String tableName) {
+            this(dbName, tableName, null);
+        }
+
+        public RecordKey(String tableName) {
+            this(null, tableName, null);
+        }
+
         public String database() {
             return this.dbName;
         }
@@ -139,31 +152,47 @@ public class HiveProducerRecord {
 
         public String toString() {
             String partitionKVs = this.partitionKVs == null ? "null" : this.partitionKVs.toString();
-            return "RecordKey(database=" + this.dbName + ", table=" + this.tableName + ", partition=" + partitionKVs
-                    + ")";
+            return "RecordKey(database=" + this.dbName + ", table=" + this.tableName + ", partition=" + partitionKVs + ")";
         }
 
-        @Override
         public boolean equals(Object o) {
-            if (this == o)
+            if (this == o) {
                 return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            RecordKey recordKey = (RecordKey) o;
-
-            if (dbName != null ? !dbName.equals(recordKey.dbName) : recordKey.dbName != null)
-                return false;
-            if (tableName != null ? !tableName.equals(recordKey.tableName) : recordKey.tableName != null)
+            } else if (!(o instanceof RecordKey)) {
                 return false;
-            return partitionKVs != null ? partitionKVs.equals(recordKey.partitionKVs) : recordKey.partitionKVs == null;
+            } else {
+                RecordKey that = (RecordKey) o;
+                if (this.dbName != null) {
+                    if (!this.dbName.equals(that.dbName)) {
+                        return false;
+                    }
+                } else if (that.dbName != null) {
+                    return false;
+                }
+
+                if (this.tableName != null) {
+                    if (!this.tableName.equals(that.tableName)) {
+                        return false;
+                    }
+                } else if (that.tableName != null) {
+                    return false;
+                }
+
+                if (this.partitionKVs != null) {
+                    if (!this.partitionKVs.equals(that.partitionKVs)) {
+                        return false;
+                    }
+                } else if (that.partitionKVs != null) {
+                    return false;
+                }
+            }
+            return true;
         }
 
-        @Override
         public int hashCode() {
-            int result = dbName != null ? dbName.hashCode() : 0;
-            result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
-            result = 31 * result + (partitionKVs != null ? partitionKVs.hashCode() : 0);
+            int result = this.dbName != null ? this.dbName.hashCode() : 0;
+            result = 31 * result + (this.tableName != null ? this.tableName.hashCode() : 0);
+            result = 31 * result + (this.partitionKVs != null ? this.partitionKVs.hashCode() : 0);
             return result;
         }
     }
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
index d1e252f..9d93e99 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
@@ -84,10 +84,6 @@ public class HiveReservoirReporter extends ActiveReservoirReporter {
         stop();
     }
 
-    HiveReservoirListener getListener() {
-        return listener;
-    }
-
     /**
      * A builder for {@link HiveReservoirReporter} instances.
      */
@@ -111,19 +107,15 @@ public class HiveReservoirReporter extends ActiveReservoirReporter {
         }
     }
 
-    class HiveReservoirListener implements ActiveReservoirListener {
+    private class HiveReservoirListener implements ActiveReservoirListener {
         private Properties props;
         private Map<String, HiveProducer> producerMap = new HashMap<>();
 
-        private long nRecord = 0;
-        private long nRecordSkip = 0;
-        private long nUpdate = 0;
-
         private HiveReservoirListener(Properties props) throws Exception {
             this.props = props;
         }
 
-        synchronized HiveProducer getProducer(String metricType) throws Exception {
+        private synchronized HiveProducer getProducer(String metricType) throws Exception {
             HiveProducer producer = producerMap.get(metricType);
             if (producer == null) {
                 producer = new HiveProducer(metricType, props);
@@ -137,7 +129,6 @@ public class HiveReservoirReporter extends ActiveReservoirReporter {
                 return true;
             }
             logger.info("Try to write {} records", records.size());
-            long prevNRecord = nRecord;
             try {
                 Map<String, List<Record>> queues = new HashMap<>();
                 for (Record record : records) {
@@ -151,17 +142,21 @@ public class HiveReservoirReporter extends ActiveReservoirReporter {
                 for (Map.Entry<String, List<Record>> entry : queues.entrySet()) {
                     HiveProducer producer = getProducer(entry.getKey());
                     producer.send(entry.getValue());
-                    nRecord += entry.getValue().size();
                 }
                 queues.clear();
-                if (nUpdate++ % 100 == 0) {
-                    logger.info("Has done the update {} times with {} records reported, {} records skipped", nUpdate,
-                            nRecord, nRecordSkip);
-                }
             } catch (Exception e) {
-                nRecordSkip += records.size() - (nRecord - prevNRecord);
                 logger.error(e.getMessage(), e);
-                logger.warn("Has skipped reporting {} records", nRecordSkip);
+                return false;
+            }
+            return true;
+        }
+
+        public boolean onRecordUpdate(final Record record) {
+            try {
+                HiveProducer producer = getProducer(record.getSubject());
+                producer.send(record);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
                 return false;
             }
             return true;
@@ -173,13 +168,5 @@ public class HiveReservoirReporter extends ActiveReservoirReporter {
             }
             producerMap.clear();
         }
-
-        public long getNRecord() {
-            return nRecord;
-        }
-
-        public long getNRecordSkip() {
-            return nRecordSkip;
-        }
     }
 }
diff --git a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java
deleted file mode 100644
index ead74ad..0000000
--- a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.lib.impl.hive;
-
-import static org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.DELIMITER;
-import static org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey.DEFAULT_DB_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class HiveProducerRecordTest {
-
-    @Test
-    public void testRecord() {
-        String dbName = "KYLIN";
-        String tableName = "test";
-        Map<String, String> partitionKVs = Maps.newHashMap();
-        partitionKVs.put("key1", "value1");
-
-        Set<RecordKey> keySet = Sets.newHashSet();
-        RecordKey key1 = new HiveProducerRecord.KeyBuilder(tableName).build();
-        RecordKey key11 = new HiveProducerRecord.KeyBuilder(tableName).setDbName(DEFAULT_DB_NAME).build();
-        keySet.add(key1);
-        keySet.add(key11);
-        assertEquals(1, keySet.size());
-
-        RecordKey key2 = new HiveProducerRecord.KeyBuilder(tableName).setDbName(dbName).build();
-        RecordKey key3 = new HiveProducerRecord.KeyBuilder(tableName).setDbName(dbName).setPartitionKVs(partitionKVs)
-                .build();
-        keySet.add(key2);
-        keySet.add(key3);
-        assertEquals(3, keySet.size());
-        assertEquals(dbName, key2.database());
-        assertEquals(tableName, key2.table());
-
-        List<Object> value1 = Lists.<Object> newArrayList(1);
-        List<Object> value2 = Lists.<Object> newArrayList(1, "1");
-
-        assertNull(new HiveProducerRecord(key1, null).valueToString());
-
-        Set<HiveProducerRecord> recordSet = Sets.newHashSet();
-        HiveProducerRecord record1 = new HiveProducerRecord(key1, value1);
-        HiveProducerRecord record11 = new HiveProducerRecord(key11, value1);
-        recordSet.add(record1);
-        recordSet.add(record11);
-        assertEquals(1, recordSet.size());
-        assertEquals(key1, record1.key());
-        assertEquals(value1, record1.value());
-
-        recordSet.add(new HiveProducerRecord(key1, value2));
-        recordSet.add(new HiveProducerRecord(key2, value1));
-        assertEquals(3, recordSet.size());
-        assertEquals(1, record1.valueToString().split(DELIMITER).length);
-    }
-}
diff --git a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java
deleted file mode 100644
index 2adc34f..0000000
--- a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.lib.impl.hive;
-
-import static org.junit.Assert.assertEquals;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
-import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
-import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
-import org.apache.kylin.source.hive.HiveMetaStoreClientFactory;
-import org.apache.kylin.tool.metrics.systemcube.HiveTableCreator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.rule.PowerMockRule;
-
-@PrepareForTest(fullyQualifiedNames = { "org.apache.hadoop.fs.FileSystem",
-        "org.apache.kylin.source.hive.HiveMetaStoreClientFactory",
-        "org.apache.kylin.metrics.lib.impl.hive.HiveProducer$1" })
-public class HiveProducerTest {
-
-    @Rule
-    public PowerMockRule rule = new PowerMockRule();
-
-    private HiveProducer hiveProducer;
-    private HiveMetaStoreClient metaStoreClient;
-
-    @Before
-    public void setUp() throws Exception {
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta");
-
-        FileSystem hdfs = PowerMockito.mock(FileSystem.class);
-        URI uri = PowerMockito.mock(URI.class);
-        PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", Configuration.class)).toReturn(hdfs);
-        PowerMockito.when(hdfs.getUri()).thenReturn(uri);
-        PowerMockito.when(uri.toString()).thenReturn("hdfs");
-
-        HiveConf hiveConf = PowerMockito.mock(HiveConf.class);
-        String metricsType = new HiveSink()
-                .getTableFromSubject(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
-
-        hiveProducer = new HiveProducer(metricsType, new Properties(), hiveConf);
-
-        metaStoreClient = PowerMockito.mock(HiveMetaStoreClient.class);
-        PowerMockito.whenNew(HiveMetaStoreClient.class).withArguments(hiveConf).thenReturn(metaStoreClient);
-        PowerMockito
-                .stub(PowerMockito.method(HiveMetaStoreClientFactory.class, "getHiveMetaStoreClient", HiveConf.class))
-                .toReturn(metaStoreClient);
-    }
-
-    @After
-    public void after() throws Exception {
-        System.clearProperty(KylinConfig.KYLIN_CONF);
-    }
-
-    @Test
-    public void testProduce() throws Exception {
-        TimedRecordEvent rpcEvent = generateTestRPCRecord();
-
-        Map<String, String> partitionKVs = Maps.newHashMap();
-        partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), rpcEvent.getDayDate());
-
-        List<Object> value = Lists.newArrayList(rpcEvent.getHost(), "default", "test_cube", "sandbox", "NULL", 80L, 3L,
-                3L, 0L, 0L, 0L, rpcEvent.getTime(), rpcEvent.getYear(), rpcEvent.getMonth(),
-                rpcEvent.getWeekBeginDate(), rpcEvent.getDayTime(), rpcEvent.getTimeHour(), rpcEvent.getTimeMinute(),
-                rpcEvent.getTimeSecond(), rpcEvent.getDayDate());
-
-        HiveProducerRecord.RecordKey key = new HiveProducerRecord.KeyBuilder("HIVE_metrics_query_rpc_test")
-                .setDbName("KYLIN").setPartitionKVs(partitionKVs).build();
-        HiveProducerRecord target = new HiveProducerRecord(key, value);
-
-        prepareMockForEvent(rpcEvent);
-        assertEquals(target, hiveProducer.convertTo(rpcEvent));
-    }
-
-    private void prepareMockForEvent(RecordEvent event) throws Exception {
-        String tableFullName = new HiveSink().getTableFromSubject(event.getEventType());
-        Pair<String, String> tableNameSplits = ActiveReservoirReporter.getTableNameSplits(tableFullName);
-        String dbName = tableNameSplits.getFirst();
-        String tableName = tableNameSplits.getSecond();
-
-        Table table = PowerMockito.mock(Table.class);
-        PowerMockito.when(metaStoreClient, "getTable", dbName, tableName).thenReturn(table);
-
-        StorageDescriptor sd = PowerMockito.mock(StorageDescriptor.class);
-        PowerMockito.when(table, "getSd").thenReturn(sd);
-        PowerMockito.when(sd, "getLocation").thenReturn(null);
-
-        List<Pair<String, String>> columns = HiveTableCreator.getHiveColumnsForMetricsQueryRPC();
-        List<Pair<String, String>> partitions = HiveTableCreator.getPartitionKVsForHiveTable();
-        columns.addAll(partitions);
-        List<FieldSchema> fields = Lists.newArrayListWithExpectedSize(columns.size());
-        for (Pair<String, String> column : columns) {
-            fields.add(new FieldSchema(column.getFirst(), column.getSecond(), null));
-        }
-        PowerMockito.when(metaStoreClient, "getFields", dbName, tableName).thenReturn(fields);
-    }
-
-    private TimedRecordEvent generateTestRPCRecord() {
-        TimedRecordEvent rpcMetricsEvent = new TimedRecordEvent("metrics_query_rpc_test");
-        setRPCWrapper(rpcMetricsEvent, "default", "test_cube", "sandbox", null);
-        setRPCStats(rpcMetricsEvent, 80L, 0L, 3L, 3L, 0L);
-        return rpcMetricsEvent;
-    }
-
-    private static void setRPCWrapper(RecordEvent metricsEvent, String projectName, String realizationName,
-            String rpcServer, Throwable throwable) {
-        metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), projectName);
-        metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), realizationName);
-        metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), rpcServer);
-        metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(),
-                throwable == null ? "NULL" : throwable.getClass().getName());
-    }
-
-    private static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, long skipCount, long scanCount,
-            long returnCount, long aggrCount) {
-        metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), callTimeMs);
-        metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), skipCount); //Number of skips on region servers based on region meta or fuzzy filter
-        metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), scanCount); //Count scanned by region server
-        metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), returnCount);//Count returned by region server
-        metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount); //Count filtered & aggregated by coprocessor
-        metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), aggrCount); //Count aggregated by coprocessor
-    }
-}
diff --git a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java
deleted file mode 100644
index fbb656c..0000000
--- a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.lib.impl.hive;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metrics.lib.ActiveReservoir;
-import org.apache.kylin.metrics.lib.Record;
-import org.apache.kylin.metrics.lib.impl.InstantReservoir;
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.rule.PowerMockRule;
-
-import com.google.common.collect.Lists;
-
-@PrepareForTest({ HiveReservoirReporter.HiveReservoirListener.class })
-public class HiveReservoirReporterTest {
-
-    @Rule
-    public PowerMockRule rule = new PowerMockRule();
-
-    private HiveReservoirReporter hiveReporter;
-    private ActiveReservoir reservoir;
-
-    @Before
-    public void setUp() throws Exception {
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta");
-
-        HiveProducer hiveProducer = PowerMockito.mock(HiveProducer.class);
-        PowerMockito.whenNew(HiveProducer.class).withAnyArguments().thenReturn(hiveProducer);
-
-        reservoir = new InstantReservoir();
-        reservoir.start();
-        hiveReporter = HiveReservoirReporter.forRegistry(reservoir).build();
-    }
-
-    @After
-    public void after() throws Exception {
-        System.clearProperty(KylinConfig.KYLIN_CONF);
-    }
-
-    @Test
-    public void testUpdate() throws Exception {
-        String metricsType = "TEST";
-        Record record = new RecordEvent(metricsType);
-        reservoir.update(record);
-        assertEquals(0, hiveReporter.getListener().getNRecord());
-
-        hiveReporter.start();
-        reservoir.update(record);
-        reservoir.update(record);
-        assertEquals(2, hiveReporter.getListener().getNRecord());
-
-        hiveReporter.stop();
-        reservoir.update(record);
-        assertEquals(2, hiveReporter.getListener().getNRecord());
-
-        hiveReporter.start();
-        reservoir.update(record);
-        PowerMockito.doThrow(new Exception()).when(hiveReporter.getListener().getProducer(metricsType))
-                .send(Lists.newArrayList(record));
-        reservoir.update(record);
-        assertEquals(3, hiveReporter.getListener().getNRecord());
-        assertEquals(1, hiveReporter.getListener().getNRecordSkip());
-    }
-}
diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml
index 30759d4..173febdc 100644
--- a/metrics-reporter-kafka/pom.xml
+++ b/metrics-reporter-kafka/pom.xml
@@ -42,24 +42,5 @@
             <artifactId>kafka_2.11</artifactId>
             <scope>provided</scope>
         </dependency>
-
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-api-mockito</artifactId>
-            <version>${powermock.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-module-junit4-rule-agent</artifactId>
-            <version>${powermock.version}</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
index b1a1bd1..df79c57 100644
--- a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
@@ -115,12 +115,4 @@ public abstract class KafkaActiveReserviorListener implements ActiveReservoirLis
         logger.debug("Cannot find topic {}", topic);
         topicsIfAvailable.put(topic, System.currentTimeMillis());
     }
-
-    public long getNRecord() {
-        return nRecord;
-    }
-
-    public long getNRecordSkip() {
-        return nRecordSkip;
-    }
 }
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
index 97b839c..a7b58a6 100644
--- a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
@@ -88,10 +88,6 @@ public class KafkaReservoirReporter extends ActiveReservoirReporter {
         stop();
     }
 
-    KafkaReservoirListener getListener() {
-        return listener;
-    }
-
     /**
      * A builder for {@link KafkaReservoirReporter} instances.
      */
@@ -117,7 +113,7 @@ public class KafkaReservoirReporter extends ActiveReservoirReporter {
         }
     }
 
-    class KafkaReservoirListener extends KafkaActiveReserviorListener {
+    private class KafkaReservoirListener extends KafkaActiveReserviorListener {
         protected final Producer<byte[], byte[]> producer;
 
         private KafkaReservoirListener(Properties props) {
diff --git a/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java b/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java
deleted file mode 100644
index 4a14e66..0000000
--- a/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.lib.impl.kafka;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metrics.lib.ActiveReservoir;
-import org.apache.kylin.metrics.lib.Record;
-import org.apache.kylin.metrics.lib.impl.InstantReservoir;
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.rule.PowerMockRule;
-
-@PrepareForTest({ KafkaReservoirReporter.KafkaReservoirListener.class })
-public class KafkaReservoirReporterTest {
-
-    @Rule
-    public PowerMockRule rule = new PowerMockRule();
-
-    private KafkaReservoirReporter kafkaReporter;
-    private ActiveReservoir reservoir;
-
-    @Before
-    public void setUp() throws Exception {
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta");
-
-        KafkaProducer kafkaProducer = PowerMockito.mock(KafkaProducer.class);
-        PowerMockito.whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducer);
-
-        reservoir = new InstantReservoir();
-        reservoir.start();
-        kafkaReporter = KafkaReservoirReporter.forRegistry(reservoir).build();
-    }
-
-    @After
-    public void after() throws Exception {
-        System.clearProperty(KylinConfig.KYLIN_CONF);
-    }
-
-    @Test
-    public void testUpdate() {
-        Record record = new RecordEvent("TEST");
-        reservoir.update(record);
-        assertEquals(0, kafkaReporter.getListener().getNRecord());
-
-        kafkaReporter.start();
-        reservoir.update(record);
-        reservoir.update(record);
-        assertEquals(2, kafkaReporter.getListener().getNRecord());
-
-        kafkaReporter.stop();
-        reservoir.update(record);
-        assertEquals(2, kafkaReporter.getListener().getNRecord());
-        assertEquals(0, kafkaReporter.getListener().getNRecordSkip());
-    }
-}
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
similarity index 99%
rename from metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
rename to tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
index 2f9eb1d..35d9efb 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
@@ -19,8 +19,8 @@
 package org.apache.kylin.tool.metrics.systemcube;
 
 import java.util.List;
-import java.util.Locale;
 
+import java.util.Locale;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
@@ -32,6 +32,7 @@ import org.apache.kylin.metrics.property.JobPropertyEnum;
 import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
 import org.apache.kylin.metrics.property.QueryPropertyEnum;
 import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
+
 import org.apache.kylin.shaded.com.google.common.base.Strings;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;