You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by li...@apache.org on 2015/12/20 16:47:38 UTC
incubator-eagle git commit: EAGLE-75 Integrate dropwizard metric
framework
Repository: incubator-eagle
Updated Branches:
refs/heads/master 0c583e5d9 -> 628e87537
EAGLE-75 Integrate dropwizard metric framework
https://issues.apache.org/jira/browse/EAGLE-75
- Integrate dropwizard metric framework
- Delete old metric classes
Author: @sunlibin <li...@apache.org>
Reviewer: @RalphSu <su...@gmail.com>, @haoch <ha...@apache.org>
Closes #28
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/628e8753
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/628e8753
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/628e8753
Branch: refs/heads/master
Commit: 628e8753759294d3dfb5a0295493a1f2ef264631
Parents: 0c583e5
Author: sunlibin <ab...@gmail.com>
Authored: Sun Dec 20 23:19:48 2015 +0800
Committer: sunlibin <ab...@gmail.com>
Committed: Sun Dec 20 23:43:41 2015 +0800
----------------------------------------------------------------------
.../apache/eagle/executor/AlertExecutor.java | 111 ++++++-------------
.../eagle/alert/cep/TestSiddhiEvaluator.java | 3 -
eagle-core/eagle-metric/pom.xml | 5 +
.../org/apache/eagle/metric/CountingMetric.java | 48 --------
.../java/org/apache/eagle/metric/Metric.java | 88 ---------------
.../org/apache/eagle/metric/MetricOperator.java | 22 ----
.../manager/EagleMetricReportManager.java | 61 ----------
.../metric/report/EagleServiceMetricReport.java | 60 ----------
.../metric/report/MetricEntityConvert.java | 32 ------
.../eagle/metric/report/MetricReport.java | 26 -----
.../metric/reportor/EagleCounterMetric.java | 73 ++++++++++++
.../eagle/metric/reportor/EagleGaugeMetric.java | 45 ++++++++
.../eagle/metric/reportor/EagleMetric.java | 64 +++++++++++
.../eagle/metric/reportor/EagleMetricKey.java | 28 +++++
.../metric/reportor/EagleMetricListener.java | 31 ++++++
.../EagleServiceReporterMetricListener.java | 63 +++++++++++
.../eagle/metric/reportor/IEagleMetric.java | 40 +++++++
.../metric/reportor/MetricEntityAdaptor.java | 50 +++++++++
.../metric/reportor/MetricKeyCodeDecoder.java | 64 +++++++++++
.../eagle/service/client/ServiceConfig.java | 11 ++
.../metric/kafka/KafkaLatestOffsetFetcher.java | 14 +--
.../kafka/KafkaMessageDistributionExecutor.java | 93 ++++++----------
.../eagle/metric/kafka/KafkaOffsetSpout.java | 33 +++---
.../src/main/resources/application.conf | 5 +-
.../partition/DataDistributionDaoImpl.java | 2 +-
25 files changed, 571 insertions(+), 501 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
index 7e4372c..8b928c3 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
@@ -16,38 +16,38 @@
*/
package org.apache.eagle.executor;
+import com.codahale.metrics.MetricRegistry;
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.time.DateUtils;
import org.apache.eagle.alert.common.AlertConstants;
import org.apache.eagle.alert.config.AbstractPolicyDefinition;
import org.apache.eagle.alert.dao.AlertDefinitionDAO;
import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl;
import org.apache.eagle.alert.entity.AlertAPIEntity;
import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
-import org.apache.eagle.metric.CountingMetric;
-import org.apache.eagle.metric.Metric;
-import org.apache.eagle.metric.report.EagleServiceMetricReport;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
import org.apache.eagle.alert.policy.*;
import org.apache.eagle.alert.siddhi.EagleAlertContext;
import org.apache.eagle.alert.siddhi.SiddhiAlertHandler;
import org.apache.eagle.alert.siddhi.StreamMetadataManager;
+import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.commons.lang3.time.DateUtils;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.datastream.Tuple2;
+import org.apache.eagle.metric.reportor.EagleCounterMetric;
+import org.apache.eagle.metric.reportor.EagleMetricListener;
+import org.apache.eagle.metric.reportor.EagleServiceReporterMetricListener;
+import org.apache.eagle.metric.reportor.MetricKeyCodeDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.List;
-import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods, SiddhiAlertHandler {
private static final long serialVersionUID = 1L;
@@ -68,12 +68,11 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
private static String EAGLE_POLICY_EVAL_FAIL_COUNT = "eagle.policy.eval.fail.count";
private static String EAGLE_ALERT_COUNT = "eagle.alert.count";
private static String EAGLE_ALERT_FAIL_COUNT = "eagle.alert.fail.count";
- private static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE;
- private Map<String, Metric> metricMap; // metricMap's key = metricName[#policyId]
+ private static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE;
private Map<String, Map<String, String>> dimensionsMap; // cache it for performance
private Map<String, String> baseDimensions;
- private Thread metricReportThread;
- private EagleServiceMetricReport metricReport;
+ private MetricRegistry registry;
+ private EagleMetricListener listener;
public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
AlertDefinitionDAO alertDefinitionDao, String[] sourceStreams){
@@ -88,7 +87,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
public String getAlertExecutorId(){
return this.alertExecutorId;
}
-
+
public int getNumPartitions() {
return this.numPartitions;
}
@@ -115,32 +114,25 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
}
public void initMetricReportor() {
- String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
- int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) ?
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ?
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
- this.metricReport = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
- metricMap = new ConcurrentHashMap<String, Metric>();
- baseDimensions = new HashMap<String, String>();
+ //TODO: need to it replace it with batch flush listener
+ registry = new MetricRegistry();
+ listener = new EagleServiceReporterMetricListener(host, port, username, password);
+
+ baseDimensions = new HashMap<>();
baseDimensions.put(AlertConstants.ALERT_EXECUTOR_ID, alertExecutorId);
baseDimensions.put(AlertConstants.PARTITIONSEQ, String.valueOf(partitionSeq));
baseDimensions.put(AlertConstants.SOURCE, ManagementFactory.getRuntimeMXBean().getName());
baseDimensions.put(EagleConfigConstants.DATA_SOURCE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE));
baseDimensions.put(EagleConfigConstants.SITE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE));
-
- dimensionsMap = new HashMap<String, Map<String, String>>();
- this.metricReportThread = new Thread() {
- @Override
- public void run() {
- runMetricReporter();
- }
- };
- this.metricReportThread.setDaemon(true);
- metricReportThread.start();
+ dimensionsMap = new HashMap<>();
}
@Override
@@ -246,44 +238,17 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
return value / granularity * granularity;
}
- public void runMetricReporter() {
- while(true) {
- try {
- long current = System.currentTimeMillis();
- List<Metric> metricList = new ArrayList<Metric>();
- synchronized (this.metricMap) {
- for (Entry<String, Metric> entry : metricMap.entrySet()) {
- String name = entry.getKey();
- Metric metric = entry.getValue();
- long previous = metric.getTimestamp();
- if (current > previous + MERITE_GRANULARITY) {
- metricList.add(metric);
- metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDimensions(), metric.getMetricName()));
- }
- }
- }
- if (metricList.size() > 0) {
- LOG.info("Going to persist alert metrics, size: " + metricList.size());
- metricReport.emit(metricList);
- }
- try {
- Thread.sleep(MERITE_GRANULARITY / 2);
- } catch (InterruptedException ex) { /* Do nothing */ }
- }
- catch (Throwable t) {
- LOG.error("Got a throwable in metricReporter " , t);
- }
- }
- }
-
public void updateCounter(String name, Map<String, String> dimensions, double value) {
long current = System.currentTimeMillis();
- synchronized (metricMap) {
- if (metricMap.get(name) == null) {
- String metricName = name.split("#")[0];
- metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), dimensions, metricName));
- }
- metricMap.get(name).update(value);
+ String metricName = MetricKeyCodeDecoder.codeMetricKey(name, dimensions);
+ if (registry.getMetrics().get(metricName) == null) {
+ EagleCounterMetric metric = new EagleCounterMetric(current, metricName, value, MERITE_GRANULARITY);
+ metric.registerListener(listener);
+ registry.register(metricName, metric);
+ } else {
+ EagleCounterMetric metric = (EagleCounterMetric) registry.getMetrics().get(metricName);
+ metric.update(value, current);
+ //TODO: need remove unused metric from registry
}
}
@@ -299,10 +264,6 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
}
return dimensionsMap.get(policyId);
}
-
- public String getMetricKey(String metricName, String policy) {
- return metricName + "#" + policy;
- }
/**
* within this single executor, execute all PolicyEvaluator sequentially
@@ -325,7 +286,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
for(Entry<String, PolicyEvaluator> entry : policyEvaluators.entrySet()){
String policyId = entry.getKey();
PolicyEvaluator evaluator = entry.getValue();
- updateCounter(getMetricKey(EAGLE_POLICY_EVAL_COUNT, policyId), getDimensions(policyId));
+ updateCounter(EAGLE_POLICY_EVAL_COUNT, getDimensions(policyId));
try {
EagleAlertContext siddhiAlertContext = new EagleAlertContext();
siddhiAlertContext.alertExecutor = this;
@@ -336,7 +297,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
}
catch (Exception ex) {
LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex);
- updateCounter(getMetricKey(EAGLE_POLICY_EVAL_FAIL_COUNT, policyId), getDimensions(policyId));
+ updateCounter(EAGLE_POLICY_EVAL_FAIL_COUNT, getDimensions(policyId));
}
}
}
@@ -400,7 +361,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
LOG.info(String.format("Detected %s alerts for policy %s",alerts.size(),policyId));
Collector outputCollector = context.outputCollector;
PolicyEvaluator evaluator = context.evaluator;
- updateCounter(getMetricKey(EAGLE_ALERT_COUNT, policyId), getDimensions(policyId), alerts.size());
+ updateCounter(EAGLE_ALERT_COUNT, getDimensions(policyId), alerts.size());
for (AlertAPIEntity entity : alerts) {
synchronized(this) {
outputCollector.collect(new Tuple2(policyId, entity));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
index 47381d1..c1b4185 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
@@ -107,9 +107,6 @@ public class TestSiddhiEvaluator {
public Map<String, String> getDimensions(String policyId) {
return new HashMap<String, String>();
}
-
- @Override
- public void runMetricReporter() {}
};
context.alertExecutor.prepareConfig(config);
context.alertExecutor.init();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/pom.xml b/eagle-core/eagle-metric/pom.xml
index d1fc1d4..c12a2b9 100644
--- a/eagle-core/eagle-metric/pom.xml
+++ b/eagle-core/eagle-metric/pom.xml
@@ -32,6 +32,11 @@
<dependencies>
<dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.1.2</version>
+ </dependency>
+ <dependency>
<groupId>eagle</groupId>
<artifactId>eagle-entity-base</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
deleted file mode 100644
index 4f65b8e..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.util.concurrent.AtomicDouble;
-
-/**
- */
-public class CountingMetric extends Metric{
-
- public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName, double value) {
- super(timestamp, dimensions, metricName, new AtomicDouble(value));
- }
-
- public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName, AtomicDouble value) {
- super(timestamp, dimensions, metricName, value);
- }
-
- public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName) {
- this(timestamp, dimensions, metricName, new AtomicDouble(0.0));
- }
-
- public CountingMetric(CountingMetric metric) {
- this(metric.timestamp, new HashMap<>(metric.dimensions), metric.metricName, metric.value);
- }
-
- @Override
- public double update(double delta) {
- return value.addAndGet(delta);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
deleted file mode 100644
index 616c82b..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import com.google.common.util.concurrent.AtomicDouble;
-
-/**
- */
-public abstract class Metric implements MetricOperator{
-
- protected final long timestamp;
- protected final Map<String, String> dimensions;
- protected final String metricName;
- protected final AtomicDouble value;
-
- public Metric(long timestamp, Map<String, String> dimensions, String metricName, AtomicDouble value) {
- this.timestamp = timestamp;
- this.dimensions = new HashMap<>(dimensions);
- this.metricName = metricName;
- this.value = value;
- }
-
- public Metric(long timestamp, Map<String, String> dimensions, String metricName) {
- this(timestamp, dimensions,metricName, new AtomicDouble(0.0));
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public Map<String, String> getDimensions() {
- return dimensions;
- }
-
- public String getMetricName() {
- return metricName;
- }
-
- public AtomicDouble getValue() {
- return value;
- }
-
- @Override
- public int hashCode() {
- int hashCode = (int) (timestamp % Integer.MAX_VALUE);
- for (Entry<String, String> entry : dimensions.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- hashCode ^= key.hashCode() ^ value.hashCode();
- }
- return hashCode;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof Metric) {
- Metric event = (Metric) obj;
- if (dimensions.size() != event.dimensions.size()) return false;
- for (Entry<String, String> keyValue : event.dimensions.entrySet()) {
- boolean keyExist = dimensions.containsKey(keyValue.getKey());
- if ( !keyExist || !dimensions.get(keyValue.getKey()).equals(keyValue.getValue())) {
- return false;
- }
- }
- if (timestamp != event.timestamp) return false;
- return true;
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricOperator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricOperator.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricOperator.java
deleted file mode 100644
index 2059ea4..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricOperator.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric;
-
-public interface MetricOperator {
-
- double update(double value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
deleted file mode 100644
index 153159c..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.manager;
-
-import org.apache.eagle.metric.Metric;
-import org.apache.eagle.metric.report.MetricReport;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class EagleMetricReportManager {
-
- private static EagleMetricReportManager manager = new EagleMetricReportManager();
- private Map<String, MetricReport> metricReportMap = new ConcurrentHashMap<>();
-
- private EagleMetricReportManager() {
-
- }
-
- public static EagleMetricReportManager getInstance () {
- return manager;
- }
-
- public boolean register(String name, MetricReport report) {
- if (metricReportMap.get(name) == null) {
- synchronized (metricReportMap) {
- if (metricReportMap.get(name) == null) {
- metricReportMap.put(name, report);
- return true;
- }
- }
- }
- return false;
- }
-
- public Map<String, MetricReport> getRegisteredReports() {
- return metricReportMap;
- }
-
- public void emit(List<Metric> list) {
- synchronized (this.metricReportMap) {
- for (MetricReport report : metricReportMap.values()) {
- report.emit(list);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
deleted file mode 100644
index 7ff415e..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.report;
-
-import org.apache.eagle.log.entity.GenericMetricEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.metric.Metric;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class EagleServiceMetricReport implements MetricReport{
-
- private EagleServiceClientImpl client;
- private static final Logger LOG = LoggerFactory.getLogger(EagleServiceMetricReport.class);
-
- public EagleServiceMetricReport(String host, int port, String username, String password) {
- client = new EagleServiceClientImpl(host, port, username, password);
- }
-
- public EagleServiceMetricReport(String host, int port) {
- client = new EagleServiceClientImpl(host, port, null, null);
- }
-
- public void emit(List<Metric> list) {
- List<GenericMetricEntity> entities = new ArrayList<GenericMetricEntity>();
- for (Metric metric : list) {
- entities.add(MetricEntityConvert.convert(metric));
- }
- try {
- int total = entities.size();
- GenericServiceAPIResponseEntity<String> response = client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE);
- if(response.isSuccess()) {
- LOG.info("Wrote " + total + " entities to service");
- }else{
- LOG.error("Failed to write " + total + " entities to service, due to server exception: "+ response.getException());
- }
- }
- catch (Exception ex) {
- LOG.error("Got exception while writing entities: ", ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
deleted file mode 100644
index c389fa7..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.report;
-
-import org.apache.eagle.log.entity.GenericMetricEntity;
-import org.apache.eagle.metric.Metric;
-
-public class MetricEntityConvert {
-
- public static GenericMetricEntity convert(Metric metric) {
- GenericMetricEntity entity = new GenericMetricEntity();
- entity.setPrefix(metric.getMetricName());
- entity.setValue(new double[]{metric.getValue().get()});
- entity.setTags(metric.getDimensions());
- entity.setTimestamp(metric.getTimestamp());
- return entity;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
deleted file mode 100644
index 85d423b..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.report;
-
-import java.util.List;
-
-import org.apache.eagle.metric.Metric;
-
-public interface MetricReport {
- // The method should be thread safe
- void emit(List<Metric> list);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleCounterMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleCounterMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleCounterMetric.java
new file mode 100644
index 0000000..0a7f70e
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleCounterMetric.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class EagleCounterMetric extends EagleMetric {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EagleCounterMetric.class);
+
+ public EagleCounterMetric(long latestUserTimeClock, String name, double value, long granularity) {
+ super(latestUserTimeClock, name, value, granularity);
+ }
+
+ public EagleCounterMetric(EagleCounterMetric metric) {
+ super(metric);
+ }
+
+ public long trim(long latestUserTimeClock) {
+ return latestUserTimeClock / granularity * granularity;
+ }
+
+ public void flush(long latestUserTimeClock) {
+ for (EagleMetricListener listener : metricListeners) {
+ EagleCounterMetric newEagleMetric = new EagleCounterMetric(this);
+ newEagleMetric.name = MetricKeyCodeDecoder.addTimestampToMetricKey(trim(latestUserTimeClock), newEagleMetric.name);
+ listener.onMetricFlushed(Arrays.asList((EagleMetric)newEagleMetric));
+ }
+ }
+
+ public boolean checkIfNeedFlush(long currentUserTimeClock) {
+ if (currentUserTimeClock - latestUserTimeClock > granularity) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean update(double d, long currentUserTimeClock) {
+ Boolean readyToFlushed = checkIfNeedFlush(currentUserTimeClock);
+ if (!readyToFlushed) {
+ if (currentUserTimeClock < latestUserTimeClock) {
+ LOG.warn("Something must be wrong, event should come in order of userTimeClock");
+ }
+ value.addAndGet(d);
+ }
+ else {
+ flush(latestUserTimeClock);
+ value.getAndSet(1);
+ latestUserTimeClock = currentUserTimeClock;
+ }
+ return readyToFlushed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleGaugeMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleGaugeMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleGaugeMetric.java
new file mode 100644
index 0000000..e6fc098
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleGaugeMetric.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class EagleGaugeMetric extends EagleMetric {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EagleGaugeMetric.class);
+
+ public EagleGaugeMetric(long latestUserTimeClock, String name, double value) {
+ super(latestUserTimeClock, name, value, 0);
+ }
+
+ public EagleGaugeMetric(EagleGaugeMetric metric) {
+ super(metric);
+ }
+
+ public boolean update(double d, long currentUserTimeClock) {
+ value.getAndSet(d);
+ this.latestUserTimeClock = currentUserTimeClock;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetric.java
new file mode 100644
index 0000000..e45a8ce
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetric.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.commons.lang.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class EagleMetric implements IEagleMetric {
+
+ public long latestUserTimeClock;
+ public AtomicDouble value;
+ public String name;
+ public long granularity;
+ public List<EagleMetricListener> metricListeners = new ArrayList<>();
+ private static final Logger LOG = LoggerFactory.getLogger(EagleCounterMetric.class);
+
+ public EagleMetric(EagleMetric metric) {
+ this.latestUserTimeClock = metric.latestUserTimeClock;
+ this.name = metric.name;
+ this.value = new AtomicDouble(metric.value.doubleValue());
+ this.granularity = metric.granularity;
+ }
+
+ public EagleMetric(long latestUserTimeClock, String name, double value, long granularity) {
+ this.latestUserTimeClock = latestUserTimeClock;
+ this.name = name;
+ this.value = new AtomicDouble(value);
+ this.granularity = granularity;
+ }
+
+ public EagleMetric(long latestUserTimeClock, String metricName, double value) {
+ this(latestUserTimeClock, metricName, value, 5 * DateUtils.MILLIS_PER_MINUTE);
+ }
+
+ public void registerListener(EagleMetricListener listener) {
+ metricListeners.add(listener);
+ }
+
+ public Double getValue() {
+ return value.doubleValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricKey.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricKey.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricKey.java
new file mode 100644
index 0000000..daf6ddf
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricKey.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import java.util.Map;
+
+public class EagleMetricKey {
+ public String metricName;
+ public Map<String, String> tags;
+ public long timestamp;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricListener.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricListener.java
new file mode 100644
index 0000000..d8fe7b2
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricListener.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import java.util.List;
+
+public interface EagleMetricListener {
+
+ /**
+ * The method should be called in thread-safe mode
+ * @param metrics
+ */
+ void onMetricFlushed(List<EagleMetric> metrics);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleServiceReporterMetricListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleServiceReporterMetricListener.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleServiceReporterMetricListener.java
new file mode 100644
index 0000000..49a8cdc
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleServiceReporterMetricListener.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EagleServiceReporterMetricListener implements EagleMetricListener{
+
+ private EagleServiceClientImpl client;
+ private static final Logger LOG = LoggerFactory.getLogger(EagleServiceReporterMetricListener.class);
+
+ public EagleServiceReporterMetricListener(String host, int port, String username, String password) {
+ client = new EagleServiceClientImpl(host, port, username, password);
+ }
+
+ public EagleServiceReporterMetricListener(String host, int port) {
+ client = new EagleServiceClientImpl(host, port, null, null);
+ }
+
+ public void onMetricFlushed(List<EagleMetric> metrics) {
+ List<GenericMetricEntity> entities = new ArrayList<>();
+ for (EagleMetric metric : metrics) {
+ String metricName = metric.name;
+ entities.add(MetricEntityAdaptor.convert(metricName, metric));
+ }
+ try {
+ int total = entities.size();
+ GenericServiceAPIResponseEntity<String> response = client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE);
+ if(response.isSuccess()) {
+ LOG.info("Wrote " + total + " entities to service");
+ }else{
+ LOG.error("Failed to write " + total + " entities to service, due to server exception: "+ response.getException());
+ }
+ }
+ catch (Exception ex) {
+ LOG.error("Got exception while writing entities: ", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/IEagleMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/IEagleMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/IEagleMetric.java
new file mode 100644
index 0000000..43f8092
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/IEagleMetric.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import com.codahale.metrics.Gauge;
+
+/**
+ * It's just a workaround to extends Gauge instead of Metric interface
+ * For MetricRegistry's notifyListenerOfRemovedMetric method will throw exception on unknown metric type
+ */
+
+public interface IEagleMetric extends Gauge<Double> {
+
+ void registerListener(EagleMetricListener listener);
+
+ /**
+ * return true if the metric need to be flushed, otherwise return false
+ * @param value
+ * @param userTimeClock
+ * @return
+ */
+ boolean update(double value, long userTimeClock);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricEntityAdaptor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricEntityAdaptor.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricEntityAdaptor.java
new file mode 100644
index 0000000..e1f8154
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricEntityAdaptor.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import com.codahale.metrics.Metric;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+
+public class MetricEntityAdaptor {
+
+ public static GenericMetricEntity convert(String name, Metric metric) {
+ //TODO: add other type metric support
+ EagleMetricKey metricName = MetricKeyCodeDecoder.decodeTSMetricKey(name);
+ if (metric instanceof EagleCounterMetric) {
+ EagleCounterMetric counter = (EagleCounterMetric)metric;
+ GenericMetricEntity entity = new GenericMetricEntity();
+ entity.setPrefix(metricName.metricName);
+ entity.setValue(new double[]{counter.getValue()});
+ entity.setTags(metricName.tags);
+ entity.setTimestamp(metricName.timestamp);
+ return entity;
+ }
+ else if (metric instanceof EagleGaugeMetric) {
+ EagleGaugeMetric gauge = (EagleGaugeMetric)metric;
+ GenericMetricEntity entity = new GenericMetricEntity();
+ entity.setPrefix(metricName.metricName);
+ entity.setValue(new double[]{gauge.getValue()});
+ entity.setTags(metricName.tags);
+ entity.setTimestamp(metricName.timestamp);
+ return entity;
+ }
+ throw new RuntimeException("Not support this metric type for now!");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricKeyCodeDecoder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricKeyCodeDecoder.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricKeyCodeDecoder.java
new file mode 100644
index 0000000..7f798fb
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricKeyCodeDecoder.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricKeyCodeDecoder {
+
+ public static String codeMetricKey(String metricName, Map<String, String> tags) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(metricName);
+ for (Map.Entry<String, String> entry : tags.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ sb.append(" " + key + ":" + value);
+ }
+ return sb.toString();
+ }
+
+ public static EagleMetricKey decodeMetricKey(String name) {
+ EagleMetricKey metricName = new EagleMetricKey();
+ String[] parts = name.split(" ");
+ metricName.metricName = parts[0];
+ metricName.tags = new HashMap<>();
+ for (int i = 1; i < parts.length; i++) {
+ String[] keyValue = parts[i].split(":");
+ metricName.tags.put(keyValue[0], keyValue[1]);
+ }
+ return metricName;
+ }
+
+ public static String addTimestampToMetricKey(long timestamp, String metricKey) {
+ return timestamp + " " + metricKey;
+ }
+
+ public static String codeTSMetricKey(long timestamp, String metricName, Map<String, String> tags) {
+ return addTimestampToMetricKey(timestamp, codeMetricKey(metricName, tags));
+ }
+
+ public static EagleMetricKey decodeTSMetricKey(String name) {
+ Integer index = name.indexOf(" ");
+ EagleMetricKey metricKey = decodeMetricKey(name.substring(index + 1));
+ metricKey.timestamp = Long.valueOf(name.substring(0, index));
+ return metricKey;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
index e68360f..1dbf75b 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
@@ -26,4 +26,15 @@ public class ServiceConfig implements Serializable{
public Integer servicePort;
public String username;
public String password;
+
+ public ServiceConfig() {
+
+ }
+
+ public ServiceConfig(String serviceHost, Integer servicePort, String username, String password) {
+ this.serviceHost = serviceHost;
+ this.servicePort = servicePort;
+ this.username = username;
+ this.password = password;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
index de93ea3..90342f3 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
@@ -41,18 +41,16 @@ public class KafkaLatestOffsetFetcher {
Map<Integer, Long> ret = new HashMap<>();
for (int partition = 0; partition < partitionCount; partition++) {
PartitionMetadata metadata = metadatas.get(partition);
- if (metadata == null) {
- throw new RuntimeException("Can't find metadata for Topic and Partition. Exiting");
- }
- if (metadata.leader() == null) {
- throw new RuntimeException("Can't find Leader for Topic and Partition. Exiting");
+ if (metadata == null || metadata.leader() == null) {
+ ret.put(partition, -1L);
+ //throw new RuntimeException("Can't find Leader for Topic and Partition. Exiting");
}
String leadBroker = metadata.leader().host();
String clientName = "Client_" + topic + "_" + partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
- long lastestOffset = getLatestOffset(consumer, topic, partition, clientName);
+ long latestOffset = getLatestOffset(consumer, topic, partition, clientName);
if (consumer != null) consumer.close();
- ret.put(partition, lastestOffset);
+ ret.put(partition, latestOffset);
}
return ret;
}
@@ -88,7 +86,7 @@ public class KafkaLatestOffsetFetcher {
}
if (partitionMetadata.size() == partitionCount) break;
} catch (Exception e) {
- throw new RuntimeException("Error communicating with Broker [" + broker + "] " + "to find Leader for [" + topic + "] Reason: " + e);
+ throw new RuntimeException("Error communicating with Broker [" + broker + "] " + "to find Leader for [" + topic + "] Reason: ", e);
} finally {
if (consumer != null) consumer.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
index 7af5ea6..b521d65 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
@@ -19,47 +19,31 @@
package org.apache.eagle.metric.kafka;
+import com.codahale.metrics.MetricRegistry;
import com.typesafe.config.Config;
+import org.apache.commons.lang.time.DateUtils;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.datastream.Collector;
import org.apache.eagle.datastream.JavaStormStreamExecutor1;
import org.apache.eagle.datastream.Tuple1;
-import org.apache.eagle.metric.CountingMetric;
-import org.apache.eagle.metric.Metric;
-import org.apache.eagle.metric.manager.EagleMetricReportManager;
-import org.apache.eagle.metric.report.EagleServiceMetricReport;
+import org.apache.eagle.metric.reportor.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<String> {
private Config config;
private Map<String, String> baseMetricDimension;
- private Map<String, EventMetric> eventMetrics;
- private static final long DEFAULT_METRIC_GRANULARITY = 5 * 60 * 1000;
- private static final String metricName = "kafka.message.user.count";
+ private MetricRegistry registry;
+ private EagleMetricListener listener;
+ private long granularity;
+ private static final long DEFAULT_METRIC_GRANULARITY = 60 * 1000;
private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionExecutor.class);
- public static class EventMetric {
- long latestMessageTime;
- Metric metric;
-
- public EventMetric(long latestMessageTime, Metric metric) {
- this.latestMessageTime = latestMessageTime;
- this.metric = metric;
- }
-
- public void update(double d) {
- this.metric.update(d);
- }
- }
-
@Override
public void prepareConfig(Config config) {
this.config = config;
@@ -72,54 +56,45 @@ public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<S
this.baseMetricDimension = new HashMap<>();
this.baseMetricDimension.put("site", site);
this.baseMetricDimension.put("topic", topic);
- String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
- int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
- String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
- String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ registry = new MetricRegistry();
- EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
- EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report);
- eventMetrics = new ConcurrentHashMap<>();
- }
+ this.granularity = DEFAULT_METRIC_GRANULARITY;
+ if (config.hasPath("dataSourceConfig.kafkaDistributionDataIntervalMin")) {
+ this.granularity = config.getInt("dataSourceConfig.kafkaDistributionDataIntervalMin") * DateUtils.MILLIS_PER_MINUTE;
+ }
- public long trimTimestamp(long timestamp, long granularity) {
- return timestamp / granularity * granularity;
+ String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ listener = new EagleServiceReporterMetricListener(host, port, username, password);
}
- public void putNewMetric(long currentMessageTime, String user) {
- Map<String ,String> dimensions = new HashMap<>();
+ public String generateMetricKey(String user) {
+ Map<String, String> dimensions = new HashMap<>();
dimensions.putAll(baseMetricDimension);
dimensions.put("user", user);
- long trimTimestamp = trimTimestamp(currentMessageTime, DEFAULT_METRIC_GRANULARITY);
- Metric metric = new CountingMetric(trimTimestamp, dimensions, metricName, 1);
- eventMetrics.put(user, new EventMetric(currentMessageTime, metric));
- }
-
- public void update(long currentMessageTime, String user) {
- if (eventMetrics.get(user) == null) {
- LOG.info("A new user in the time interval, user: " + user + ", currentMessageTime: " + currentMessageTime);
- putNewMetric(currentMessageTime, user);
- }
- else {
- long latestMessageTime = eventMetrics.get(user).latestMessageTime;
- if (currentMessageTime > latestMessageTime + DEFAULT_METRIC_GRANULARITY) {
- LOG.info("Going to emit a user metric, user: " + user + ", currentMessageTime: " + currentMessageTime
- + ", latestMessageTime: " + latestMessageTime);
- EagleMetricReportManager.getInstance().emit(Arrays.asList(eventMetrics.remove(user).metric));
- putNewMetric(currentMessageTime, user);
- }
- else {
- eventMetrics.get(user).update(1);
- }
- }
+ String metricName = "eagle.kafka.message.count";
+ String encodedMetricName = MetricKeyCodeDecoder.codeMetricKey(metricName, dimensions);
+ return encodedMetricName;
}
@Override
public void flatMap(List<Object> input, Collector<Tuple1<String>> collector) {
try {
String user = (String) input.get(0);
- Long timestamp = (Long) (input.get(1));
- update(timestamp, user);
+ Long timestamp = (Long) input.get(1);
+ String metricKey = generateMetricKey(user);
+ if (registry.getMetrics().get(metricKey) == null) {
+ EagleCounterMetric metric = new EagleCounterMetric(timestamp, metricKey, 1.0, granularity);
+ metric.registerListener(listener);
+ registry.register(metricKey, metric);
+ }
+ else {
+ EagleMetric metric = (EagleMetric)registry.getMetrics().get(metricKey);
+ metric.update(1, timestamp);
+ //TODO: if we need to remove metric from registry
+ }
}
catch (Exception ex) {
LOG.error("Got an exception, ex: ", ex);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
index aee817a..a03705f 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
@@ -20,10 +20,7 @@ import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
-import org.apache.eagle.metric.CountingMetric;
-import org.apache.eagle.metric.Metric;
-import org.apache.eagle.metric.manager.EagleMetricReportManager;
-import org.apache.eagle.metric.report.EagleServiceMetricReport;
+import org.apache.eagle.metric.reportor.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,12 +31,13 @@ import java.util.Map;
public class KafkaOffsetSpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
- private static final long DEFAULT_ROUND_INTERVALS = 5 * 60 * 1000;
+ private static final long DEFAULT_ROUND_INTERVALS = 60 * 1000;
private KafkaOffsetCheckerConfig config;
private KafkaConsumerOffsetFetcher consumerOffsetFetcher;
private KafkaLatestOffsetFetcher latestOffsetFetcher;
private Map<String, String> baseMetricDimension;
private long lastRoundTime = 0;
+ private EagleMetricListener listener;
private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetSpout.class);
@@ -57,20 +55,20 @@ public class KafkaOffsetSpout extends BaseRichSpout {
this.baseMetricDimension.put("site", config.kafkaConfig.site);
this.baseMetricDimension.put("topic", config.kafkaConfig.topic);
this.baseMetricDimension.put("group", config.kafkaConfig.group);
- String eagleServiceHost = config.serviceConfig.serviceHost;
- Integer eagleServicePort = config.serviceConfig.servicePort;
+ String host = config.serviceConfig.serviceHost;
+ Integer port = config.serviceConfig.servicePort;
String username = config.serviceConfig.username;
String password = config.serviceConfig.password;
- EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
- EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report);
+ listener = new EagleServiceReporterMetricListener(host, port, username, password);
}
- public Metric constructMetric(long timestamp, String partition, double value) {
+ public EagleMetric constructMetric(long timestamp, String partition, double value) {
Map<String, String> dimensions = new HashMap<>();
dimensions.putAll(baseMetricDimension);
dimensions.put("partition", partition);
String metricName = "eagle.kafka.message.consumer.lag";
- Metric metric = new CountingMetric(timestamp, dimensions, metricName, value);
+ String metricKey = MetricKeyCodeDecoder.codeTSMetricKey(timestamp, metricName, dimensions);
+ EagleGaugeMetric metric = new EagleGaugeMetric(timestamp, metricKey, value);
return metric;
}
@@ -83,18 +81,21 @@ public class KafkaOffsetSpout extends BaseRichSpout {
Long currentTime = System.currentTimeMillis();
if (currentTime - lastRoundTime > DEFAULT_ROUND_INTERVALS) {
try {
- long trimedCurrentTime = trimTimestamp(currentTime, DEFAULT_ROUND_INTERVALS);
+ long trimCurrentTime = trimTimestamp(currentTime, DEFAULT_ROUND_INTERVALS);
Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(config.kafkaConfig.topic, consumedOffset.size());
- List<Metric> list = new ArrayList<>();
+ List<EagleMetric> metrics = new ArrayList<>();
for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
String partition = entry.getKey();
Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
Long lag = latestOffset.get(partitionNumber) - entry.getValue();
- list.add(constructMetric(trimedCurrentTime, partition, lag));
+ // If the partition is not available
+ if (latestOffset.get(partitionNumber) == -1) lag = -1L;
+ EagleMetric metric = constructMetric(trimCurrentTime, partition, lag);
+ metrics.add(metric);
}
- lastRoundTime = trimedCurrentTime;
- EagleMetricReportManager.getInstance().emit(list);
+ lastRoundTime = trimCurrentTime;
+ listener.onMetricFlushed(metrics);
} catch (Exception ex) {
LOG.error("Got an exception, ex: ", ex);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-security/eagle-metric-collection/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/resources/application.conf b/eagle-security/eagle-metric-collection/src/main/resources/application.conf
index 4b07019..9c91744 100644
--- a/eagle-security/eagle-metric-collection/src/main/resources/application.conf
+++ b/eagle-security/eagle-metric-collection/src/main/resources/application.conf
@@ -12,7 +12,7 @@
# For fetch gap
"site" : "sandbox",
"topic" : "sandbox_hdfs_audit_log",
- "zkQuorum" : "localhost:2191",
+ "zkQuorum" : "localhost:2181",
"hdfsTopologyConsumerGroupId" : "eagle.hdfsaudit.consumer",
"zkSessionTimeoutMs" : 15000,
"zkRetryTimes" : 3,
@@ -26,7 +26,8 @@
#"transactionZKPort" : "2181",
"transactionZKRoot" : "/consumers",
#"transactionStateUpdateMS" : 2000,
- "kafkaEndPoints" : "localhost:9092"
+ "kafkaEndPoints" : "localhost:9092",
+ "kafkaDistributionDataIntervalMin" : 1
},
"eagleProps" : {
"eagleService": {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
index 7d32091..24deb65 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
@@ -79,7 +79,7 @@ public class DataDistributionDaoImpl implements DataDistributionDao {
.endTime(endTime)
.pageSize(Integer.MAX_VALUE)
.query(query)
- .metricName("kafka.message.user.count")
+ .metricName("eagle.kafka.message.count")
.send();
if (!response.isSuccess()) {
LOG.error(response.getException());