You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/19 03:27:38 UTC

[23/50] incubator-eagle git commit: [EAGLE-618] migration eagle-jpm-aggregation to application framework

[EAGLE-618] migration eagle-jpm-aggregation to application framework

Author: wujinhu <wu...@126.com>

Closes #504 from wujinhu/EAGLE-618.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7bd5d1d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7bd5d1d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7bd5d1d2

Branch: refs/heads/master
Commit: 7bd5d1d2adb909eef7f59424f758fc0818fb683b
Parents: 0bca234
Author: wujinhu <wu...@126.com>
Authored: Fri Oct 14 11:56:03 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Fri Oct 14 11:56:03 2016 +0800

----------------------------------------------------------------------
 .../jpm/aggregation/AggregationApplication.java |  18 ++-
 .../jpm/aggregation/AggregationConfig.java      |  10 --
 ...gregation.AggregationApplicationProvider.xml | 115 +++++++++++++++++++
 .../src/main/resources/application.conf         |  23 +---
 eagle-topology-assembly/pom.xml                 |   5 +
 ...org.apache.eagle.app.spi.ApplicationProvider |   3 +-
 6 files changed, 137 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7bd5d1d2/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
index 3c40f58..0577070 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationApplication.java
@@ -31,22 +31,30 @@ import java.util.*;
 public class AggregationApplication extends StormApplication {
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
-        AggregationConfig aggregationConfig = AggregationConfig.getInstance(config);
-
         //TODO
-        List<String> metricNames = config.getStringList("aggregate.counters.metrics");
-        List<String> groupByColumns = config.getStringList("aggregate.counters.groupByColumns");
+        List<String> metricNames = new ArrayList<>();
+        String[] metricNamesArr = config.getString("aggregate.counters.metrics").split(",");
+        for (int i = 0; i < metricNamesArr.length; i++) {
+            metricNames.add(metricNamesArr[i]);
+        }
+        List<String> groupByColumns = new ArrayList<>();
+        String[] groupByColumnsArr = config.getString("aggregate.counters.groupBys").split(",");
+        for (int i = 0; i < groupByColumnsArr.length; i++) {
+            groupByColumns.add(groupByColumnsArr[i]);
+        }
+
         Map<String, List<List<String>>> metrics = new HashMap<>();
         for (String metric : metricNames) {
             metrics.put(metric, new ArrayList<>());
             for (String cols : groupByColumns) {
-                metrics.get(metric).add(Arrays.asList(cols.replaceAll(" ", "").split(",")));
+                metrics.get(metric).add(Arrays.asList(cols.replaceAll(" ", "").split("&")));
             }
         }
 
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         String spoutName = "mrHistoryAggregationSpout";
         String boltName = "mrHistoryAggregationBolt";
+        AggregationConfig aggregationConfig = AggregationConfig.getInstance(config);
         int parallelism = aggregationConfig.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
         int tasks = aggregationConfig.getConfig().getInt("envContextConfig.tasks." + spoutName);
         if (parallelism > tasks) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7bd5d1d2/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
index c50de92..b527ddb 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
@@ -26,12 +26,6 @@ import java.io.Serializable;
 public class AggregationConfig implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(AggregationConfig.class);
 
-    public String getEnv() {
-        return env;
-    }
-
-    private String env;
-
     public ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
     }
@@ -67,7 +61,6 @@ public class AggregationConfig implements Serializable {
 
     public static class JobExtractorConfig implements Serializable {
         public String site;
-        public int readTimeoutSeconds;
         public long aggregationDuration;
     }
 
@@ -104,10 +97,8 @@ public class AggregationConfig implements Serializable {
      */
     private void init(Config config) {
         this.config = config;
-        this.env = config.getString("envContextConfig.env");
         //parse eagle job extractor
         this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
-        this.jobExtractorConfig.readTimeoutSeconds = config.getInt("jobExtractorConfig.readTimeOutSeconds");
         this.jobExtractorConfig.aggregationDuration = config.getLong("jobExtractorConfig.aggregationDuration");
 
         //parse eagle zk
@@ -126,7 +117,6 @@ public class AggregationConfig implements Serializable {
         this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
 
         LOG.info("Successfully initialized MRHistoryJobConfig");
-        LOG.info("env: " + this.env);
         LOG.info("zookeeper.quorum: " + this.zkStateConfig.zkQuorum);
         LOG.info("zookeeper.property.clientPort: " + this.zkStateConfig.zkPort);
         LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7bd5d1d2/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml
new file mode 100644
index 0000000..237a437
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<application>
+    <type>MR_JPM_AGGREGATION_APP</type>
+    <name>Map Job Monitoring Aggregation Application</name>
+    <version>0.5.0-incubating</version>
+    <configuration>
+        <property>
+            <name>jobExtractorConfig.site</name>
+            <displayName>Site ID</displayName>
+            <value>sandbox</value>
+        </property>
+        <property>
+            <name>workers</name>
+            <displayName>storm worker number</displayName>
+            <value>4</value>
+        </property>
+        <property>
+            <name>envContextConfig.parallelismConfig.mrHistoryAggregationSpout</name>
+            <value>1</value>
+        </property>
+        <property>
+            <name>envContextConfig.tasks.mrHistoryAggregationSpout</name>
+            <value>1</value>
+        </property>
+        <property>
+            <name>envContextConfig.parallelismConfig.mrHistoryAggregationBolt</name>
+            <value>6</value>
+        </property>
+        <property>
+            <name>envContextConfig.tasks.mrHistoryAggregationBolt</name>
+            <value>6</value>
+        </property>
+        <property>
+            <name>jobExtractorConfig.aggregationDuration</name>
+            <description>seconds, each bolt process metrics from [start, start + aggregationDuration]</description>
+            <value>3600</value>
+        </property>
+        <property>
+            <name>zkStateConfig.zkQuorum</name>
+            <value>sandbox.hortonworks.com:2181</value>
+        </property>
+        <property>
+            <name>zkStateConfig.zkPort</name>
+            <value>2181</value>
+        </property>
+        <property>
+            <name>zkStateConfig.zkRoot</name>
+            <value>/aggregation/mr/sandbox</value>
+        </property>
+        <property>
+            <name>zkStateConfig.zkSessionTimeoutMs</name>
+            <value>15000</value>
+        </property>
+        <property>
+            <name>zkStateConfig.zkRetryTimes</name>
+            <value>3</value>
+        </property>
+        <property>
+            <name>zkStateConfig.zkRetryInterval</name>
+            <value>20000</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.host</name>
+            <description>eagleProps.eagleService.host</description>
+            <value>sandbox.hortonworks.com</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.port</name>
+            <description>eagleProps.eagleService.port</description>
+            <value>9099</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.username</name>
+            <description>eagleProps.eagleService.username</description>
+            <value>admin</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.password</name>
+            <description>eagleProps.eagleService.password</description>
+            <value>secret</value>
+        </property>
+        <property>
+            <name>aggregate.counters.metrics</name>
+            <value>cpu_milliseconds, file_bytes_read, file_bytes_written, hdfs_bytes_read, hdfs_bytes_written, hdfs_read_ops, hdfs_write_ops</value>
+        </property>
+        <property>
+            <name>aggregate.counters.groupBys</name>
+            <description>groupBys that each metric need to aggregate. If group by many columns, then split them by &</description>
+            <value>site&jobType, site&user, site</value>
+        </property>
+    </configuration>
+    <docs>
+        <install>
+        </install>
+        <uninstall>
+        </uninstall>
+    </docs>
+</application>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7bd5d1d2/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf
index 94e06b3..98fc009 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf
@@ -15,9 +15,6 @@
 
 {
   "envContextConfig" : {
-    "env" : "cluster",
-    "topologyName" : "mrHistoryAggregation",
-    "stormConfigFile" : "storm.yaml",
     "parallelismConfig" : {
       "mrHistoryAggregationSpout" : 1,
       "mrHistoryAggregationBolt" : 6
@@ -30,7 +27,6 @@
 
   "jobExtractorConfig" : {
     "site" : "sandbox",
-    "readTimeOutSeconds" : 10,
     "aggregationDuration" : 3600 #seconds, each bolt process metrics from [start, start + aggregationDuration]
   },
 
@@ -44,8 +40,6 @@
   },
 
   "eagleProps" : {
-    "mailHost" : "abc.com",
-    "mailDebug" : "true",
     "eagleService": {
       "host": "sandbox.hortonworks.com:2181",
       "port": 9099,
@@ -56,21 +50,8 @@
 
   "aggregate" : {
     "counters" : {
-      "metrics": [
-        "cpu_milliseconds",
-        "file_bytes_read",
-        "file_bytes_written",
-        "hdfs_bytes_read",
-        "hdfs_bytes_written",
-        "hdfs_read_ops",
-        "hdfs_write_ops"
-      ],
-
-      "groupByColumns": [
-        "site, jobType",
-        "site, user",
-        "site"
-      ]
+      "metrics" : "cpu_milliseconds,file_bytes_read, file_bytes_written, hdfs_bytes_read, hdfs_bytes_written, hdfs_read_ops, hdfs_write_ops",
+      "groupBys": "site&jobType, site&user, site"
     }
   },
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7bd5d1d2/eagle-topology-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 80ca265..7af6f96 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -62,6 +62,11 @@
             <artifactId>eagle-jpm-web</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-aggregation</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <profiles>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7bd5d1d2/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
index 989886f..56292d2 100644
--- a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
+++ b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -17,4 +17,5 @@ org.apache.eagle.security.hbase.HBaseAuditLogAppProvider
 org.apache.eagle.app.example.ExampleApplicationProvider
 org.apache.eagle.app.jpm.JPMWebApplicationProvider
 org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider
-org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider
\ No newline at end of file
+org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider
+org.apache.eagle.jpm.aggregation.AggregationApplicationProvider
\ No newline at end of file