You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/10/13 05:39:38 UTC
[kylin] branch master-hadoop3 updated: Revert "KYLIN-4508 Add unit
test for core-metrics module & reporters"
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master-hadoop3
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master-hadoop3 by this push:
new d8af2c8 Revert "KYLIN-4508 Add unit test for core-metrics module & reporters"
d8af2c8 is described below
commit d8af2c8385535f804009bbbca1f618918de71702
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Tue Oct 13 13:37:01 2020 +0800
Revert "KYLIN-4508 Add unit test for core-metrics module & reporters"
This reverts commit 8d894e2c8098a41d3778933cc93484a2c5f582e3.
---
.../kylin/metrics/lib/impl/TimedRecordEvent.java | 32 ----
metrics-reporter-hive/pom.xml | 25 ----
.../kylin/metrics/lib/impl/hive/HiveProducer.java | 7 +-
.../metrics/lib/impl/hive/HiveProducerRecord.java | 141 +++++++++++-------
.../lib/impl/hive/HiveReservoirReporter.java | 39 ++---
.../lib/impl/hive/HiveProducerRecordTest.java | 81 -----------
.../metrics/lib/impl/hive/HiveProducerTest.java | 161 ---------------------
.../lib/impl/hive/HiveReservoirReporterTest.java | 88 -----------
metrics-reporter-kafka/pom.xml | 19 ---
.../impl/kafka/KafkaActiveReserviorListener.java | 8 -
.../lib/impl/kafka/KafkaReservoirReporter.java | 6 +-
.../lib/impl/kafka/KafkaReservoirReporterTest.java | 79 ----------
.../tool/metrics/systemcube/HiveTableCreator.java | 3 +-
13 files changed, 104 insertions(+), 585 deletions(-)
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
index 984d5f5..a866163 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
@@ -44,36 +44,4 @@ public class TimedRecordEvent extends RecordEvent {
super.resetTime();
addTimeDetails();
}
-
- public String getYear() {
- return (String) get(TimePropertyEnum.YEAR.toString());
- }
-
- public String getMonth() {
- return (String) get(TimePropertyEnum.MONTH.toString());
- }
-
- public String getWeekBeginDate() {
- return (String) get(TimePropertyEnum.WEEK_BEGIN_DATE.toString());
- }
-
- public String getDayDate() {
- return (String) get(TimePropertyEnum.DAY_DATE.toString());
- }
-
- public String getDayTime() {
- return (String) get(TimePropertyEnum.DAY_TIME.toString());
- }
-
- public int getTimeHour() {
- return (int) get(TimePropertyEnum.TIME_HOUR.toString());
- }
-
- public int getTimeMinute() {
- return (int) get(TimePropertyEnum.TIME_MINUTE.toString());
- }
-
- public int getTimeSecond() {
- return (int) get(TimePropertyEnum.TIME_SECOND.toString());
- }
}
diff --git a/metrics-reporter-hive/pom.xml b/metrics-reporter-hive/pom.xml
index 87b7808..0159804 100644
--- a/metrics-reporter-hive/pom.xml
+++ b/metrics-reporter-hive/pom.xml
@@ -56,30 +56,5 @@
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
-
- <!-- Env & Test -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito</artifactId>
- <version>${powermock.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4-rule-agent</artifactId>
- <version>${powermock.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index e79010c..8bc7a43 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -309,7 +309,7 @@ public class HiveProducer {
fout = null;
}
- HiveProducerRecord convertTo(Record record) throws Exception {
+ private HiveProducerRecord convertTo(Record record) throws Exception {
Map<String, Object> rawValue = record.getValueRaw();
//Set partition values for hive table
@@ -330,8 +330,7 @@ public class HiveProducer {
columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase(Locale.ROOT)));
}
- HiveProducerRecord.RecordKey key = new HiveProducerRecord.KeyBuilder(tableNameSplits.getSecond())
- .setDbName(tableNameSplits.getFirst()).setPartitionKVs(partitionKVs).build();
- return new HiveProducerRecord(key, columnValues);
+ return new HiveProducerRecord(tableNameSplits.getFirst(), tableNameSplits.getSecond(), partitionKVs,
+ columnValues);
}
}
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
index fa5222f..650d18a 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
@@ -30,8 +30,23 @@ public class HiveProducerRecord {
private final RecordKey key;
private final List<Object> value;
- public HiveProducerRecord(RecordKey key, List<Object> value) {
- this.key = key;
+ public HiveProducerRecord(String dbName, String tableName, Map<String, String> partitionKVs, List<Object> value) {
+ this.key = new RecordKey(dbName, tableName, partitionKVs);
+ this.value = value;
+ }
+
+ public HiveProducerRecord(String tableName, Map<String, String> partitionKVs, List<Object> value) {
+ this.key = new RecordKey(tableName, partitionKVs);
+ this.value = value;
+ }
+
+ public HiveProducerRecord(String dbName, String tableName, List<Object> value) {
+ this.key = new RecordKey(dbName, tableName);
+ this.value = value;
+ }
+
+ public HiveProducerRecord(String tableName, List<Object> value) {
+ this.key = new RecordKey(tableName);
this.value = value;
}
@@ -60,55 +75,41 @@ public class HiveProducerRecord {
return sb.toString();
}
- @Override
public boolean equals(Object o) {
- if (this == o)
+ if (this == o) {
return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- HiveProducerRecord record = (HiveProducerRecord) o;
-
- if (key != null ? !key.equals(record.key) : record.key != null)
+ } else if (!(o instanceof HiveProducerRecord)) {
return false;
- return value != null ? value.equals(record.value) : record.value == null;
+ } else {
+ HiveProducerRecord that = (HiveProducerRecord) o;
+ if (this.key != null) {
+ if (!this.key.equals(that.key)) {
+ return false;
+ }
+ } else if (that.key != null) {
+ return false;
+ }
+ if (this.value != null) {
+ if (!this.value.equals(that.value)) {
+ return false;
+ }
+ } else if (that.value != null) {
+ return false;
+ }
+ }
+ return true;
}
- @Override
public int hashCode() {
- int result = key != null ? key.hashCode() : 0;
- result = 31 * result + (value != null ? value.hashCode() : 0);
+ int result = this.key != null ? this.key.hashCode() : 0;
+ result = 31 * result + (this.value != null ? this.value.hashCode() : 0);
return result;
}
- public static class KeyBuilder {
- private final String tableName;
- private String dbName;
- private Map<String, String> partitionKVs;
-
- public KeyBuilder(String tableName) {
- this.tableName = tableName;
- }
-
- public KeyBuilder setDbName(String dbName) {
- this.dbName = dbName;
- return this;
- }
-
- public KeyBuilder setPartitionKVs(Map<String, String> partitionKVs) {
- this.partitionKVs = partitionKVs;
- return this;
- }
-
- public RecordKey build() {
- return new RecordKey(dbName, tableName, partitionKVs);
- }
- }
-
/**
* Use to organize metrics message
*/
- public static class RecordKey {
+ public class RecordKey {
public static final String DEFAULT_DB_NAME = "DEFAULT";
private final String dbName;
@@ -125,6 +126,18 @@ public class HiveProducerRecord {
this.partitionKVs = partitionKVs;
}
+ public RecordKey(String tableName, Map<String, String> partitionKVs) {
+ this(null, tableName, partitionKVs);
+ }
+
+ public RecordKey(String dbName, String tableName) {
+ this(dbName, tableName, null);
+ }
+
+ public RecordKey(String tableName) {
+ this(null, tableName, null);
+ }
+
public String database() {
return this.dbName;
}
@@ -139,31 +152,47 @@ public class HiveProducerRecord {
public String toString() {
String partitionKVs = this.partitionKVs == null ? "null" : this.partitionKVs.toString();
- return "RecordKey(database=" + this.dbName + ", table=" + this.tableName + ", partition=" + partitionKVs
- + ")";
+ return "RecordKey(database=" + this.dbName + ", table=" + this.tableName + ", partition=" + partitionKVs + ")";
}
- @Override
public boolean equals(Object o) {
- if (this == o)
+ if (this == o) {
return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- RecordKey recordKey = (RecordKey) o;
-
- if (dbName != null ? !dbName.equals(recordKey.dbName) : recordKey.dbName != null)
- return false;
- if (tableName != null ? !tableName.equals(recordKey.tableName) : recordKey.tableName != null)
+ } else if (!(o instanceof RecordKey)) {
return false;
- return partitionKVs != null ? partitionKVs.equals(recordKey.partitionKVs) : recordKey.partitionKVs == null;
+ } else {
+ RecordKey that = (RecordKey) o;
+ if (this.dbName != null) {
+ if (!this.dbName.equals(that.dbName)) {
+ return false;
+ }
+ } else if (that.dbName != null) {
+ return false;
+ }
+
+ if (this.tableName != null) {
+ if (!this.tableName.equals(that.tableName)) {
+ return false;
+ }
+ } else if (that.tableName != null) {
+ return false;
+ }
+
+ if (this.partitionKVs != null) {
+ if (!this.partitionKVs.equals(that.partitionKVs)) {
+ return false;
+ }
+ } else if (that.partitionKVs != null) {
+ return false;
+ }
+ }
+ return true;
}
- @Override
public int hashCode() {
- int result = dbName != null ? dbName.hashCode() : 0;
- result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
- result = 31 * result + (partitionKVs != null ? partitionKVs.hashCode() : 0);
+ int result = this.dbName != null ? this.dbName.hashCode() : 0;
+ result = 31 * result + (this.tableName != null ? this.tableName.hashCode() : 0);
+ result = 31 * result + (this.partitionKVs != null ? this.partitionKVs.hashCode() : 0);
return result;
}
}
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
index d1e252f..9d93e99 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
@@ -84,10 +84,6 @@ public class HiveReservoirReporter extends ActiveReservoirReporter {
stop();
}
- HiveReservoirListener getListener() {
- return listener;
- }
-
/**
* A builder for {@link HiveReservoirReporter} instances.
*/
@@ -111,19 +107,15 @@ public class HiveReservoirReporter extends ActiveReservoirReporter {
}
}
- class HiveReservoirListener implements ActiveReservoirListener {
+ private class HiveReservoirListener implements ActiveReservoirListener {
private Properties props;
private Map<String, HiveProducer> producerMap = new HashMap<>();
- private long nRecord = 0;
- private long nRecordSkip = 0;
- private long nUpdate = 0;
-
private HiveReservoirListener(Properties props) throws Exception {
this.props = props;
}
- synchronized HiveProducer getProducer(String metricType) throws Exception {
+ private synchronized HiveProducer getProducer(String metricType) throws Exception {
HiveProducer producer = producerMap.get(metricType);
if (producer == null) {
producer = new HiveProducer(metricType, props);
@@ -137,7 +129,6 @@ public class HiveReservoirReporter extends ActiveReservoirReporter {
return true;
}
logger.info("Try to write {} records", records.size());
- long prevNRecord = nRecord;
try {
Map<String, List<Record>> queues = new HashMap<>();
for (Record record : records) {
@@ -151,17 +142,21 @@ public class HiveReservoirReporter extends ActiveReservoirReporter {
for (Map.Entry<String, List<Record>> entry : queues.entrySet()) {
HiveProducer producer = getProducer(entry.getKey());
producer.send(entry.getValue());
- nRecord += entry.getValue().size();
}
queues.clear();
- if (nUpdate++ % 100 == 0) {
- logger.info("Has done the update {} times with {} records reported, {} records skipped", nUpdate,
- nRecord, nRecordSkip);
- }
} catch (Exception e) {
- nRecordSkip += records.size() - (nRecord - prevNRecord);
logger.error(e.getMessage(), e);
- logger.warn("Has skipped reporting {} records", nRecordSkip);
+ return false;
+ }
+ return true;
+ }
+
+ public boolean onRecordUpdate(final Record record) {
+ try {
+ HiveProducer producer = getProducer(record.getSubject());
+ producer.send(record);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
return false;
}
return true;
@@ -173,13 +168,5 @@ public class HiveReservoirReporter extends ActiveReservoirReporter {
}
producerMap.clear();
}
-
- public long getNRecord() {
- return nRecord;
- }
-
- public long getNRecordSkip() {
- return nRecordSkip;
- }
}
}
diff --git a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java
deleted file mode 100644
index ead74ad..0000000
--- a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.lib.impl.hive;
-
-import static org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.DELIMITER;
-import static org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey.DEFAULT_DB_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class HiveProducerRecordTest {
-
- @Test
- public void testRecord() {
- String dbName = "KYLIN";
- String tableName = "test";
- Map<String, String> partitionKVs = Maps.newHashMap();
- partitionKVs.put("key1", "value1");
-
- Set<RecordKey> keySet = Sets.newHashSet();
- RecordKey key1 = new HiveProducerRecord.KeyBuilder(tableName).build();
- RecordKey key11 = new HiveProducerRecord.KeyBuilder(tableName).setDbName(DEFAULT_DB_NAME).build();
- keySet.add(key1);
- keySet.add(key11);
- assertEquals(1, keySet.size());
-
- RecordKey key2 = new HiveProducerRecord.KeyBuilder(tableName).setDbName(dbName).build();
- RecordKey key3 = new HiveProducerRecord.KeyBuilder(tableName).setDbName(dbName).setPartitionKVs(partitionKVs)
- .build();
- keySet.add(key2);
- keySet.add(key3);
- assertEquals(3, keySet.size());
- assertEquals(dbName, key2.database());
- assertEquals(tableName, key2.table());
-
- List<Object> value1 = Lists.<Object> newArrayList(1);
- List<Object> value2 = Lists.<Object> newArrayList(1, "1");
-
- assertNull(new HiveProducerRecord(key1, null).valueToString());
-
- Set<HiveProducerRecord> recordSet = Sets.newHashSet();
- HiveProducerRecord record1 = new HiveProducerRecord(key1, value1);
- HiveProducerRecord record11 = new HiveProducerRecord(key11, value1);
- recordSet.add(record1);
- recordSet.add(record11);
- assertEquals(1, recordSet.size());
- assertEquals(key1, record1.key());
- assertEquals(value1, record1.value());
-
- recordSet.add(new HiveProducerRecord(key1, value2));
- recordSet.add(new HiveProducerRecord(key2, value1));
- assertEquals(3, recordSet.size());
- assertEquals(1, record1.valueToString().split(DELIMITER).length);
- }
-}
diff --git a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java
deleted file mode 100644
index 2adc34f..0000000
--- a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.lib.impl.hive;
-
-import static org.junit.Assert.assertEquals;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
-import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
-import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
-import org.apache.kylin.source.hive.HiveMetaStoreClientFactory;
-import org.apache.kylin.tool.metrics.systemcube.HiveTableCreator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.rule.PowerMockRule;
-
-@PrepareForTest(fullyQualifiedNames = { "org.apache.hadoop.fs.FileSystem",
- "org.apache.kylin.source.hive.HiveMetaStoreClientFactory",
- "org.apache.kylin.metrics.lib.impl.hive.HiveProducer$1" })
-public class HiveProducerTest {
-
- @Rule
- public PowerMockRule rule = new PowerMockRule();
-
- private HiveProducer hiveProducer;
- private HiveMetaStoreClient metaStoreClient;
-
- @Before
- public void setUp() throws Exception {
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta");
-
- FileSystem hdfs = PowerMockito.mock(FileSystem.class);
- URI uri = PowerMockito.mock(URI.class);
- PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", Configuration.class)).toReturn(hdfs);
- PowerMockito.when(hdfs.getUri()).thenReturn(uri);
- PowerMockito.when(uri.toString()).thenReturn("hdfs");
-
- HiveConf hiveConf = PowerMockito.mock(HiveConf.class);
- String metricsType = new HiveSink()
- .getTableFromSubject(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
-
- hiveProducer = new HiveProducer(metricsType, new Properties(), hiveConf);
-
- metaStoreClient = PowerMockito.mock(HiveMetaStoreClient.class);
- PowerMockito.whenNew(HiveMetaStoreClient.class).withArguments(hiveConf).thenReturn(metaStoreClient);
- PowerMockito
- .stub(PowerMockito.method(HiveMetaStoreClientFactory.class, "getHiveMetaStoreClient", HiveConf.class))
- .toReturn(metaStoreClient);
- }
-
- @After
- public void after() throws Exception {
- System.clearProperty(KylinConfig.KYLIN_CONF);
- }
-
- @Test
- public void testProduce() throws Exception {
- TimedRecordEvent rpcEvent = generateTestRPCRecord();
-
- Map<String, String> partitionKVs = Maps.newHashMap();
- partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), rpcEvent.getDayDate());
-
- List<Object> value = Lists.newArrayList(rpcEvent.getHost(), "default", "test_cube", "sandbox", "NULL", 80L, 3L,
- 3L, 0L, 0L, 0L, rpcEvent.getTime(), rpcEvent.getYear(), rpcEvent.getMonth(),
- rpcEvent.getWeekBeginDate(), rpcEvent.getDayTime(), rpcEvent.getTimeHour(), rpcEvent.getTimeMinute(),
- rpcEvent.getTimeSecond(), rpcEvent.getDayDate());
-
- HiveProducerRecord.RecordKey key = new HiveProducerRecord.KeyBuilder("HIVE_metrics_query_rpc_test")
- .setDbName("KYLIN").setPartitionKVs(partitionKVs).build();
- HiveProducerRecord target = new HiveProducerRecord(key, value);
-
- prepareMockForEvent(rpcEvent);
- assertEquals(target, hiveProducer.convertTo(rpcEvent));
- }
-
- private void prepareMockForEvent(RecordEvent event) throws Exception {
- String tableFullName = new HiveSink().getTableFromSubject(event.getEventType());
- Pair<String, String> tableNameSplits = ActiveReservoirReporter.getTableNameSplits(tableFullName);
- String dbName = tableNameSplits.getFirst();
- String tableName = tableNameSplits.getSecond();
-
- Table table = PowerMockito.mock(Table.class);
- PowerMockito.when(metaStoreClient, "getTable", dbName, tableName).thenReturn(table);
-
- StorageDescriptor sd = PowerMockito.mock(StorageDescriptor.class);
- PowerMockito.when(table, "getSd").thenReturn(sd);
- PowerMockito.when(sd, "getLocation").thenReturn(null);
-
- List<Pair<String, String>> columns = HiveTableCreator.getHiveColumnsForMetricsQueryRPC();
- List<Pair<String, String>> partitions = HiveTableCreator.getPartitionKVsForHiveTable();
- columns.addAll(partitions);
- List<FieldSchema> fields = Lists.newArrayListWithExpectedSize(columns.size());
- for (Pair<String, String> column : columns) {
- fields.add(new FieldSchema(column.getFirst(), column.getSecond(), null));
- }
- PowerMockito.when(metaStoreClient, "getFields", dbName, tableName).thenReturn(fields);
- }
-
- private TimedRecordEvent generateTestRPCRecord() {
- TimedRecordEvent rpcMetricsEvent = new TimedRecordEvent("metrics_query_rpc_test");
- setRPCWrapper(rpcMetricsEvent, "default", "test_cube", "sandbox", null);
- setRPCStats(rpcMetricsEvent, 80L, 0L, 3L, 3L, 0L);
- return rpcMetricsEvent;
- }
-
- private static void setRPCWrapper(RecordEvent metricsEvent, String projectName, String realizationName,
- String rpcServer, Throwable throwable) {
- metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), projectName);
- metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), realizationName);
- metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), rpcServer);
- metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(),
- throwable == null ? "NULL" : throwable.getClass().getName());
- }
-
- private static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, long skipCount, long scanCount,
- long returnCount, long aggrCount) {
- metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), callTimeMs);
- metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), skipCount); //Number of skips on region servers based on region meta or fuzzy filter
- metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), scanCount); //Count scanned by region server
- metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), returnCount);//Count returned by region server
- metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount); //Count filtered & aggregated by coprocessor
- metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), aggrCount); //Count aggregated by coprocessor
- }
-}
diff --git a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java
deleted file mode 100644
index fbb656c..0000000
--- a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.lib.impl.hive;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metrics.lib.ActiveReservoir;
-import org.apache.kylin.metrics.lib.Record;
-import org.apache.kylin.metrics.lib.impl.InstantReservoir;
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.rule.PowerMockRule;
-
-import com.google.common.collect.Lists;
-
-@PrepareForTest({ HiveReservoirReporter.HiveReservoirListener.class })
-public class HiveReservoirReporterTest {
-
- @Rule
- public PowerMockRule rule = new PowerMockRule();
-
- private HiveReservoirReporter hiveReporter;
- private ActiveReservoir reservoir;
-
- @Before
- public void setUp() throws Exception {
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta");
-
- HiveProducer hiveProducer = PowerMockito.mock(HiveProducer.class);
- PowerMockito.whenNew(HiveProducer.class).withAnyArguments().thenReturn(hiveProducer);
-
- reservoir = new InstantReservoir();
- reservoir.start();
- hiveReporter = HiveReservoirReporter.forRegistry(reservoir).build();
- }
-
- @After
- public void after() throws Exception {
- System.clearProperty(KylinConfig.KYLIN_CONF);
- }
-
- @Test
- public void testUpdate() throws Exception {
- String metricsType = "TEST";
- Record record = new RecordEvent(metricsType);
- reservoir.update(record);
- assertEquals(0, hiveReporter.getListener().getNRecord());
-
- hiveReporter.start();
- reservoir.update(record);
- reservoir.update(record);
- assertEquals(2, hiveReporter.getListener().getNRecord());
-
- hiveReporter.stop();
- reservoir.update(record);
- assertEquals(2, hiveReporter.getListener().getNRecord());
-
- hiveReporter.start();
- reservoir.update(record);
- PowerMockito.doThrow(new Exception()).when(hiveReporter.getListener().getProducer(metricsType))
- .send(Lists.newArrayList(record));
- reservoir.update(record);
- assertEquals(3, hiveReporter.getListener().getNRecord());
- assertEquals(1, hiveReporter.getListener().getNRecordSkip());
- }
-}
diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml
index 30759d4..173febdc 100644
--- a/metrics-reporter-kafka/pom.xml
+++ b/metrics-reporter-kafka/pom.xml
@@ -42,24 +42,5 @@
<artifactId>kafka_2.11</artifactId>
<scope>provided</scope>
</dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito</artifactId>
- <version>${powermock.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4-rule-agent</artifactId>
- <version>${powermock.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
index b1a1bd1..df79c57 100644
--- a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
@@ -115,12 +115,4 @@ public abstract class KafkaActiveReserviorListener implements ActiveReservoirLis
logger.debug("Cannot find topic {}", topic);
topicsIfAvailable.put(topic, System.currentTimeMillis());
}
-
- public long getNRecord() {
- return nRecord;
- }
-
- public long getNRecordSkip() {
- return nRecordSkip;
- }
}
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
index 97b839c..a7b58a6 100644
--- a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
@@ -88,10 +88,6 @@ public class KafkaReservoirReporter extends ActiveReservoirReporter {
stop();
}
- KafkaReservoirListener getListener() {
- return listener;
- }
-
/**
* A builder for {@link KafkaReservoirReporter} instances.
*/
@@ -117,7 +113,7 @@ public class KafkaReservoirReporter extends ActiveReservoirReporter {
}
}
- class KafkaReservoirListener extends KafkaActiveReserviorListener {
+ private class KafkaReservoirListener extends KafkaActiveReserviorListener {
protected final Producer<byte[], byte[]> producer;
private KafkaReservoirListener(Properties props) {
diff --git a/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java b/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java
deleted file mode 100644
index 4a14e66..0000000
--- a/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.lib.impl.kafka;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metrics.lib.ActiveReservoir;
-import org.apache.kylin.metrics.lib.Record;
-import org.apache.kylin.metrics.lib.impl.InstantReservoir;
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.rule.PowerMockRule;
-
-@PrepareForTest({ KafkaReservoirReporter.KafkaReservoirListener.class })
-public class KafkaReservoirReporterTest {
-
- @Rule
- public PowerMockRule rule = new PowerMockRule();
-
- private KafkaReservoirReporter kafkaReporter;
- private ActiveReservoir reservoir;
-
- @Before
- public void setUp() throws Exception {
- System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta");
-
- KafkaProducer kafkaProducer = PowerMockito.mock(KafkaProducer.class);
- PowerMockito.whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducer);
-
- reservoir = new InstantReservoir();
- reservoir.start();
- kafkaReporter = KafkaReservoirReporter.forRegistry(reservoir).build();
- }
-
- @After
- public void after() throws Exception {
- System.clearProperty(KylinConfig.KYLIN_CONF);
- }
-
- @Test
- public void testUpdate() {
- Record record = new RecordEvent("TEST");
- reservoir.update(record);
- assertEquals(0, kafkaReporter.getListener().getNRecord());
-
- kafkaReporter.start();
- reservoir.update(record);
- reservoir.update(record);
- assertEquals(2, kafkaReporter.getListener().getNRecord());
-
- kafkaReporter.stop();
- reservoir.update(record);
- assertEquals(2, kafkaReporter.getListener().getNRecord());
- assertEquals(0, kafkaReporter.getListener().getNRecordSkip());
- }
-}
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
similarity index 99%
rename from metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
rename to tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
index 2f9eb1d..35d9efb 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
@@ -19,8 +19,8 @@
package org.apache.kylin.tool.metrics.systemcube;
import java.util.List;
-import java.util.Locale;
+import java.util.Locale;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
@@ -32,6 +32,7 @@ import org.apache.kylin.metrics.property.JobPropertyEnum;
import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
import org.apache.kylin.metrics.property.QueryPropertyEnum;
import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
+
import org.apache.kylin.shaded.com.google.common.base.Strings;
import org.apache.kylin.shaded.com.google.common.collect.Lists;