You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/19 03:27:38 UTC
[23/50] incubator-eagle git commit: [EAGLE-618] migration
eagle-jpm-aggregation to application framework
[EAGLE-618] migration eagle-jpm-aggregation to application framework
Author: wujinhu <wu...@126.com>
Closes #504 from wujinhu/EAGLE-618.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7bd5d1d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7bd5d1d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7bd5d1d2
Branch: refs/heads/master
Commit: 7bd5d1d2adb909eef7f59424f758fc0818fb683b
Parents: 0bca234
Author: wujinhu <wu...@126.com>
Authored: Fri Oct 14 11:56:03 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Fri Oct 14 11:56:03 2016 +0800
----------------------------------------------------------------------
.../jpm/aggregation/AggregationApplication.java | 18 ++-
.../jpm/aggregation/AggregationConfig.java | 10 --
...gregation.AggregationApplicationProvider.xml | 115 +++++++++++++++++++
.../src/main/resources/application.conf | 23 +---
eagle-topology-assembly/pom.xml | 5 +
...org.apache.eagle.app.spi.ApplicationProvider | 3 +-
6 files changed, 137 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7bd5d1d2/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
index 3c40f58..0577070 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
@@ -31,22 +31,30 @@ import java.util.*;
public class AggregationApplication extends StormApplication {
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
- AggregationConfig aggregationConfig = AggregationConfig.getInstance(config);
-
//TODO
- List<String> metricNames = config.getStringList("aggregate.counters.metrics");
- List<String> groupByColumns = config.getStringList("aggregate.counters.groupByColumns");
+ List<String> metricNames = new ArrayList<>();
+ String[] metricNamesArr = config.getString("aggregate.counters.metrics").split(",");
+ for (int i = 0; i < metricNamesArr.length; i++) {
+ metricNames.add(metricNamesArr[i]);
+ }
+ List<String> groupByColumns = new ArrayList<>();
+ String[] groupByColumnsArr = config.getString("aggregate.counters.groupBys").split(",");
+ for (int i = 0; i < groupByColumnsArr.length; i++) {
+ groupByColumns.add(groupByColumnsArr[i]);
+ }
+
Map<String, List<List<String>>> metrics = new HashMap<>();
for (String metric : metricNames) {
metrics.put(metric, new ArrayList<>());
for (String cols : groupByColumns) {
- metrics.get(metric).add(Arrays.asList(cols.replaceAll(" ", "").split(",")));
+ metrics.get(metric).add(Arrays.asList(cols.replaceAll(" ", "").split("&")));
}
}
TopologyBuilder topologyBuilder = new TopologyBuilder();
String spoutName = "mrHistoryAggregationSpout";
String boltName = "mrHistoryAggregationBolt";
+ AggregationConfig aggregationConfig = AggregationConfig.getInstance(config);
int parallelism = aggregationConfig.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
int tasks = aggregationConfig.getConfig().getInt("envContextConfig.tasks." + spoutName);
if (parallelism > tasks) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7bd5d1d2/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
index c50de92..b527ddb 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
@@ -26,12 +26,6 @@ import java.io.Serializable;
public class AggregationConfig implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(AggregationConfig.class);
- public String getEnv() {
- return env;
- }
-
- private String env;
-
public ZKStateConfig getZkStateConfig() {
return zkStateConfig;
}
@@ -67,7 +61,6 @@ public class AggregationConfig implements Serializable {
public static class JobExtractorConfig implements Serializable {
public String site;
- public int readTimeoutSeconds;
public long aggregationDuration;
}
@@ -104,10 +97,8 @@ public class AggregationConfig implements Serializable {
*/
private void init(Config config) {
this.config = config;
- this.env = config.getString("envContextConfig.env");
//parse eagle job extractor
this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
- this.jobExtractorConfig.readTimeoutSeconds = config.getInt("jobExtractorConfig.readTimeOutSeconds");
this.jobExtractorConfig.aggregationDuration = config.getLong("jobExtractorConfig.aggregationDuration");
//parse eagle zk
@@ -126,7 +117,6 @@ public class AggregationConfig implements Serializable {
this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
LOG.info("Successfully initialized MRHistoryJobConfig");
- LOG.info("env: " + this.env);
LOG.info("zookeeper.quorum: " + this.zkStateConfig.zkQuorum);
LOG.info("zookeeper.property.clientPort: " + this.zkStateConfig.zkPort);
LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7bd5d1d2/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml
new file mode 100644
index 0000000..237a437
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<application>
+ <type>MR_JPM_AGGREGATION_APP</type>
+ <name>Map Job Monitoring Aggregation Application</name>
+ <version>0.5.0-incubating</version>
+ <configuration>
+ <property>
+ <name>jobExtractorConfig.site</name>
+ <displayName>Site ID</displayName>
+ <value>sandbox</value>
+ </property>
+ <property>
+ <name>workers</name>
+ <displayName>storm worker number</displayName>
+ <value>4</value>
+ </property>
+ <property>
+ <name>envContextConfig.parallelismConfig.mrHistoryAggregationSpout</name>
+ <value>1</value>
+ </property>
+ <property>
+ <name>envContextConfig.tasks.mrHistoryAggregationSpout</name>
+ <value>1</value>
+ </property>
+ <property>
+ <name>envContextConfig.parallelismConfig.mrHistoryAggregationBolt</name>
+ <value>6</value>
+ </property>
+ <property>
+ <name>envContextConfig.tasks.mrHistoryAggregationBolt</name>
+ <value>6</value>
+ </property>
+ <property>
+ <name>jobExtractorConfig.aggregationDuration</name>
+ <description>seconds, each bolt process metrics from [start, start + aggregationDuration]</description>
+ <value>3600</value>
+ </property>
+ <property>
+ <name>zkStateConfig.zkQuorum</name>
+ <value>sandbox.hortonworks.com:2181</value>
+ </property>
+ <property>
+ <name>zkStateConfig.zkPort</name>
+ <value>2181</value>
+ </property>
+ <property>
+ <name>zkStateConfig.zkRoot</name>
+ <value>/aggregation/mr/sandbox</value>
+ </property>
+ <property>
+ <name>zkStateConfig.zkSessionTimeoutMs</name>
+ <value>15000</value>
+ </property>
+ <property>
+ <name>zkStateConfig.zkRetryTimes</name>
+ <value>3</value>
+ </property>
+ <property>
+ <name>zkStateConfig.zkRetryInterval</name>
+ <value>20000</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.host</name>
+ <description>eagleProps.eagleService.host</description>
+ <value>sandbox.hortonworks.com</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.port</name>
+ <description>eagleProps.eagleService.port</description>
+ <value>9099</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.username</name>
+ <description>eagleProps.eagleService.username</description>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.password</name>
+ <description>eagleProps.eagleService.password</description>
+ <value>secret</value>
+ </property>
+ <property>
+ <name>aggregate.counters.metrics</name>
+ <value>cpu_milliseconds, file_bytes_read, file_bytes_written, hdfs_bytes_read, hdfs_bytes_written, hdfs_read_ops, hdfs_write_ops</value>
+ </property>
+ <property>
+ <name>aggregate.counters.groupBys</name>
+ <description>groupBys that each metric need to aggregate. If group by many columns, then split them by &</description>
+ <value>site&jobType, site&user, site</value>
+ </property>
+ </configuration>
+ <docs>
+ <install>
+ </install>
+ <uninstall>
+ </uninstall>
+ </docs>
+</application>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7bd5d1d2/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf
index 94e06b3..98fc009 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf
@@ -15,9 +15,6 @@
{
"envContextConfig" : {
- "env" : "cluster",
- "topologyName" : "mrHistoryAggregation",
- "stormConfigFile" : "storm.yaml",
"parallelismConfig" : {
"mrHistoryAggregationSpout" : 1,
"mrHistoryAggregationBolt" : 6
@@ -30,7 +27,6 @@
"jobExtractorConfig" : {
"site" : "sandbox",
- "readTimeOutSeconds" : 10,
"aggregationDuration" : 3600 #seconds, each bolt process metrics from [start, start + aggregationDuration]
},
@@ -44,8 +40,6 @@
},
"eagleProps" : {
- "mailHost" : "abc.com",
- "mailDebug" : "true",
"eagleService": {
"host": "sandbox.hortonworks.com:2181",
"port": 9099,
@@ -56,21 +50,8 @@
"aggregate" : {
"counters" : {
- "metrics": [
- "cpu_milliseconds",
- "file_bytes_read",
- "file_bytes_written",
- "hdfs_bytes_read",
- "hdfs_bytes_written",
- "hdfs_read_ops",
- "hdfs_write_ops"
- ],
-
- "groupByColumns": [
- "site, jobType",
- "site, user",
- "site"
- ]
+ "metrics" : "cpu_milliseconds,file_bytes_read, file_bytes_written, hdfs_bytes_read, hdfs_bytes_written, hdfs_read_ops, hdfs_write_ops",
+ "groupBys": "site&jobType, site&user, site"
}
},
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7bd5d1d2/eagle-topology-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 80ca265..7af6f96 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -62,6 +62,11 @@
<artifactId>eagle-jpm-web</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-aggregation</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7bd5d1d2/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
index 989886f..56292d2 100644
--- a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
+++ b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -17,4 +17,5 @@ org.apache.eagle.security.hbase.HBaseAuditLogAppProvider
org.apache.eagle.app.example.ExampleApplicationProvider
org.apache.eagle.app.jpm.JPMWebApplicationProvider
org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider
-org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider
\ No newline at end of file
+org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider
+org.apache.eagle.jpm.aggregation.AggregationApplicationProvider
\ No newline at end of file