You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by mw...@apache.org on 2016/10/19 07:48:18 UTC
incubator-eagle git commit: [EAGLE-563] migrate eagle-hadoop-queue to
use application framework
Repository: incubator-eagle
Updated Branches:
refs/heads/master 855b86ef3 -> 453c3a5fa
[EAGLE-563] migrate eagle-hadoop-queue to use application framework
Migrate eagle-hadoop-queue to use application framework.
Author: anyway1021 <mw...@apache.org>
Closes #528 from anyway1021/EAGLE-563.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/453c3a5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/453c3a5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/453c3a5f
Branch: refs/heads/master
Commit: 453c3a5fa209c6c6e40dc25be6951adada61f72f
Parents: 855b86e
Author: anyway1021 <mw...@apache.org>
Authored: Wed Oct 19 15:48:01 2016 +0800
Committer: anyway1021 <mw...@apache.org>
Committed: Wed Oct 19 15:48:01 2016 +0800
----------------------------------------------------------------------
eagle-jpm/eagle-hadoop-queue/pom.xml | 5 +
.../hadoop/queue/HadoopQueueRunningApp.java | 47 ++++++++++
.../queue/HadoopQueueRunningAppConfig.java | 98 ++++++++++++++++++++
.../queue/HadoopQueueRunningAppProvider.java | 25 +++++
.../hadoop/queue/HadoopQueueRunningMain.java | 80 ++--------------
.../storm/HadoopQueueMetricPersistBolt.java | 30 +++---
.../storm/HadoopQueueRunningExtractor.java | 13 +--
.../queue/storm/HadoopQueueRunningSpout.java | 10 +-
...doop.queue.HadoopQueueRunningAppProvider.xml | 77 +++++++++++++++
...org.apache.eagle.app.spi.ApplicationProvider | 16 ++++
.../src/main/resources/application.conf | 11 ++-
.../HadoopQueueRunningAppProviderTest.java | 32 +++++++
.../hadoop/queue/HadoopQueueRunningAppTest.java | 27 ++++++
.../src/test/resources/application.conf | 11 ++-
eagle-server/pom.xml | 7 ++
eagle-topology-assembly/pom.xml | 5 +
...org.apache.eagle.app.spi.ApplicationProvider | 3 +-
17 files changed, 387 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/pom.xml b/eagle-jpm/eagle-hadoop-queue/pom.xml
index 91568ff..95929a9 100644
--- a/eagle-jpm/eagle-hadoop-queue/pom.xml
+++ b/eagle-jpm/eagle-hadoop-queue/pom.xml
@@ -57,6 +57,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-app-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
new file mode 100644
index 0000000..7a853a1
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.hadoop.queue;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt;
+import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout;
+
+public class HadoopQueueRunningApp extends StormApplication {
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ HadoopQueueRunningAppConfig appConfig = HadoopQueueRunningAppConfig.getInstance(config);
+
+ IRichSpout spout = new HadoopQueueRunningSpout(appConfig);
+ HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig);
+ TopologyBuilder builder = new TopologyBuilder();
+
+ int numOfParserTasks = appConfig.topology.numOfParserTasks;
+ int numOfSpoutTasks = 1;
+
+ String spoutName = "runningQueueSpout";
+ String boltName = "parserBolt";
+
+ builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
+ builder.setBolt(boltName, bolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping(spoutName);
+
+ return builder.createTopology();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
new file mode 100644
index 0000000..5d5d736
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.hadoop.queue;
+
+import com.typesafe.config.Config;
+
+import java.io.Serializable;
+
+public class HadoopQueueRunningAppConfig implements Serializable {
+ public static final HadoopQueueRunningAppConfig instance = new HadoopQueueRunningAppConfig();
+
+ public Topology topology;
+ public DataSourceConfig dataSourceConfig;
+ public EagleProps eagleProps;
+
+ private Config config = null;
+
+ private HadoopQueueRunningAppConfig() {
+ this.topology = new Topology();
+ this.dataSourceConfig = new DataSourceConfig();
+ this.eagleProps = new EagleProps();
+ this.config = null;
+ }
+
+ public static class Topology implements Serializable {
+ public boolean localMode;
+ public int numOfParserTasks;
+ public String name;
+ }
+
+ public static class DataSourceConfig implements Serializable {
+ public String rMEndPoints;
+ public String fetchIntervalSec;
+ }
+
+ public static class EagleProps implements Serializable {
+ public String site;
+ public EagleService eagleService;
+
+ public EagleProps() {
+ eagleService = new EagleService();
+ }
+
+ public static class EagleService implements Serializable {
+ public String host;
+ public int port;
+ public String username;
+ public String password;
+ }
+ }
+
+ public static HadoopQueueRunningAppConfig getInstance(Config config) {
+ if (config != null && instance.config == null) {
+ synchronized (instance) {
+ if (instance.config == null) {
+ instance.init(config);
+ }
+ return instance;
+ }
+ }
+ return instance;
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+
+ private void init(Config config) {
+ this.config = config;
+
+ this.topology.localMode = config.getBoolean("topology.localMode");
+ this.topology.numOfParserTasks = config.getInt("topology.numOfParserTasks");
+ this.topology.name = config.getString("topology.name");
+
+ this.dataSourceConfig.rMEndPoints = config.getString("dataSourceConfig.rMEndPoints");
+ this.dataSourceConfig.fetchIntervalSec = config.getString("dataSourceConfig.fetchIntervalSec");
+
+ this.eagleProps.site = config.getString("eagleProps.site");
+ this.eagleProps.eagleService.host = config.getString("eagleProps.eagleService.host");
+ this.eagleProps.eagleService.port = config.getInt("eagleProps.eagleService.port");
+ this.eagleProps.eagleService.username = config.getString("eagleProps.eagleService.username");
+ this.eagleProps.eagleService.password = config.getString("eagleProps.eagleService.password");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
new file mode 100644
index 0000000..916dd5b
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.hadoop.queue;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+public class HadoopQueueRunningAppProvider extends AbstractApplicationProvider<HadoopQueueRunningApp> {
+ public HadoopQueueRunningApp getApplication() {
+ return new HadoopQueueRunningApp();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
index 37fd17b..d6e90fa 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
@@ -8,84 +8,18 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.eagle.hadoop.queue;
-import org.apache.eagle.common.config.ConfigOptionParser;
-import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt;
-import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.TopologyBuilder;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class HadoopQueueRunningMain {
- public static final String PARSER_TASK_NUM = "topology.numOfParserTasks";
- public static final String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers";
- public static final String TOPOLOGY_NAME = "topology.name";
- public static final String LOCAL_MODE = "topology.localMode";
-
- private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningMain.class);
-
public static void main(String[] args) {
- //System.setProperty("config.resource", "/application.conf");
- //Config config = ConfigFactory.load();
- Config config = null;
- try {
- LOG.info("Loading from configuration file");
- config = new ConfigOptionParser().load(args);
- } catch (Exception e) {
- LOG.error("failed to load config");
- }
- IRichSpout spout = new HadoopQueueRunningSpout(config);
- HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(config);
- TopologyBuilder builder = new TopologyBuilder();
-
- int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
- int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
- int numOfSpoutTasks = 1;
-
- String spoutName = "runningQueueSpout";
- String boltName = "parserBolt";
-
- builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
- builder.setBolt(boltName, bolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping(spoutName);
-
- StormTopology topology = builder.createTopology();
-
- backtype.storm.Config stormConf = new backtype.storm.Config();
- stormConf.setNumWorkers(numOfTotalWorkers);
- stormConf.put(stormConf.TOPOLOGY_DEBUG, true);
-
- String topoName = config.getString(TOPOLOGY_NAME);
- Boolean local = config.getBoolean(LOCAL_MODE);
- try {
- if (!local) {
- StormSubmitter.submitTopology(topoName, stormConf, topology);
- } else {
- //local mode
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(topoName, stormConf, topology);
- }
- } catch (InvalidTopologyException e) {
- e.printStackTrace();
- } catch (AlreadyAliveException e) {
- e.printStackTrace();
- }
-
+ new HadoopQueueRunningApp().run(args);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
index c6c204a..4edf27d 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
@@ -8,16 +8,22 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
package org.apache.eagle.hadoop.queue.storm;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig;
import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
import org.apache.eagle.log.entity.GenericMetricEntity;
@@ -25,15 +31,9 @@ import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.EagleServiceConnector;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.util.List;
import java.util.Map;
@@ -41,17 +41,17 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueMetricPersistBolt.class);
- private Config config;
+ private HadoopQueueRunningAppConfig config;
private IEagleServiceClient client;
private OutputCollector collector;
- public HadoopQueueMetricPersistBolt(Config config) {
+ public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config) {
this.config = config;
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config));
+ this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig()));
this.collector = collector;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
index ef0c762..c5e0654 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
@@ -18,7 +18,8 @@
package org.apache.eagle.hadoop.queue.storm;
-import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils;
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig;
import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
import org.apache.eagle.hadoop.queue.common.YarnURLSelectorImpl;
import org.apache.eagle.hadoop.queue.crawler.ClusterMetricsCrawler;
@@ -26,9 +27,6 @@ import org.apache.eagle.hadoop.queue.crawler.RunningAppsCrawler;
import org.apache.eagle.hadoop.queue.crawler.SchedulerInfoCrawler;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelector;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +40,6 @@ public class HadoopQueueRunningExtractor {
private static final Logger LOGGER = LoggerFactory.getLogger(HadoopQueueRunningExtractor.class);
private static final int MAX_NUM_THREADS = 10;
private static final int MAX_WAIT_TIME = 10;
- private static final String DEFAULT_SITE = "sandbox";
private String site;
private String urlBases;
@@ -51,9 +48,9 @@ public class HadoopQueueRunningExtractor {
private ExecutorService executorService;
private SpoutOutputCollector collector;
- public HadoopQueueRunningExtractor(Config eagleConf, SpoutOutputCollector collector) {
- site = HadoopYarnResourceUtils.getConfigValue(eagleConf, "eagleProps.site", DEFAULT_SITE);
- urlBases = HadoopYarnResourceUtils.getConfigValue(eagleConf, "dataSourceConfig.RMEndPoints", "");
+ public HadoopQueueRunningExtractor(HadoopQueueRunningAppConfig eagleConf, SpoutOutputCollector collector) {
+ site = eagleConf.eagleProps.site;
+ urlBases = eagleConf.dataSourceConfig.rMEndPoints;
if (urlBases == null) {
throw new IllegalArgumentException(site + ".baseurl is null");
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
index 7053a09..530be9a 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
@@ -18,6 +18,8 @@
package org.apache.eagle.hadoop.queue.storm;
+import org.apache.eagle.hadoop.queue.HadoopQueueRunningApp;
+import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig;
import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils;
@@ -35,18 +37,16 @@ import java.util.Map;
public class HadoopQueueRunningSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningSpout.class);
- private static final String FETCH_INTERVAL_CONF = "dataSourceConfig.FetchIntervalSec";
- private static final String DEFAULT_FETCH_INTERVAL_SECONDS = "10";
private long fetchIntervalSec;
private long lastFetchTime = 0;
private HadoopQueueRunningExtractor extractor;
- private Config config;
+ private HadoopQueueRunningAppConfig config;
- public HadoopQueueRunningSpout(Config config) {
+ public HadoopQueueRunningSpout(HadoopQueueRunningAppConfig config) {
this.config = config;
- fetchIntervalSec = Long.parseLong(HadoopYarnResourceUtils.getConfigValue(config, FETCH_INTERVAL_CONF, DEFAULT_FETCH_INTERVAL_SECONDS));
+ fetchIntervalSec = Long.parseLong(config.dataSourceConfig.fetchIntervalSec);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
new file mode 100644
index 0000000..02b60ef
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<application>
+ <type>HADOOP_QUEUE_RUNNING_APP</type>
+ <name>Hadoop Queue Running Monitoring</name>
+ <version>0.5.0-incubating</version>
+ <configuration>
+ <!-- org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig -->
+ <property>
+ <name>workers</name>
+ <displayName>storm worker number</displayName>
+ <value>4</value>
+ </property>
+ <property>
+ <name>topology.localMode</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>topology.numOfParserTasks</name>
+ <value>2</value>
+ </property>
+ <property>
+ <name>topology.name</name>
+ <value>sandbox-running-queue-topology</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.rMEndPoints</name>
+ <value>http://sandbox.hortonworks.com:8088/</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.fetchIntervalSec</name>
+ <value>10</value>
+ </property>
+ <property>
+ <name>eagleProps.site</name>
+ <value>sandbox</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.host</name>
+ <value>localhost</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.port</name>
+ <value>9099</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.username</name>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.password</name>
+ <value>secret</value>
+ </property>
+ </configuration>
+ <docs>
+ <install>
+ </install>
+ <uninstall>
+ </uninstall>
+ </docs>
+</application>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..a35bb7d
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf
index 77ae8be..807bd5b 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf
@@ -16,13 +16,12 @@
{
"topology" : {
"localMode" : true,
- "numOfTotalWorkers" : 2,
"numOfParserTasks" : 2,
"name" : "sandbox-running-queue-topology",
},
"dataSourceConfig": {
- "RMEndPoints" : "http://sandbox.hortonworks.com:8088/",
- "FetchIntervalSec": "10"
+ "rMEndPoints" : "http://sandbox.hortonworks.com:8088/",
+ "fetchIntervalSec": "10"
},
"eagleProps" : {
"site": "sandbox",
@@ -33,4 +32,8 @@
"password": "secret"
}
}
-}
\ No newline at end of file
+ "appId":"hadoopQueueMonitorJob",
+ "mode":"LOCAL",
+ application.storm.nimbusHost=localhost,
+ "workers":4,
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProviderTest.java b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProviderTest.java
new file mode 100644
index 0000000..633e802
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProviderTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.hadoop.queue;
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.apache.eagle.app.test.ApplicationTestBase;
+import org.junit.Test;
+
+public class HadoopQueueRunningAppProviderTest extends ApplicationTestBase {
+ @Inject
+ private ApplicationSimulator simulator;
+
+ @Test
+ public void testRunAsManagedApplicationWithSimulator() {
+ simulator.start(HadoopQueueRunningAppProvider.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java
new file mode 100644
index 0000000..32ed320
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.hadoop.queue;
+
+import com.typesafe.config.ConfigFactory;
+import org.junit.Test;
+
+public class HadoopQueueRunningAppTest {
+ @Test
+ public void testRun(){
+ new HadoopQueueRunningApp().run(ConfigFactory.load());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf
index 77ae8be..807bd5b 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf
+++ b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf
@@ -16,13 +16,12 @@
{
"topology" : {
"localMode" : true,
- "numOfTotalWorkers" : 2,
"numOfParserTasks" : 2,
"name" : "sandbox-running-queue-topology",
},
"dataSourceConfig": {
- "RMEndPoints" : "http://sandbox.hortonworks.com:8088/",
- "FetchIntervalSec": "10"
+ "rMEndPoints" : "http://sandbox.hortonworks.com:8088/",
+ "fetchIntervalSec": "10"
},
"eagleProps" : {
"site": "sandbox",
@@ -33,4 +32,8 @@
"password": "secret"
}
}
-}
\ No newline at end of file
+ "appId":"hadoopQueueMonitorJob",
+ "mode":"LOCAL",
+ application.storm.nimbusHost=localhost,
+ "workers":4,
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-server/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml
index 07f02ec..a07cc89 100644
--- a/eagle-server/pom.xml
+++ b/eagle-server/pom.xml
@@ -309,6 +309,13 @@
<artifactId>eagle-jpm-service</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <!-- App: Hadoop Queue Running Monitoring-->
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-hadoop-queue</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-topology-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 7af6f96..4ea2b0a 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -67,6 +67,11 @@
<artifactId>eagle-jpm-aggregation</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-hadoop-queue</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
index 56292d2..8d35f31 100644
--- a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
+++ b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -18,4 +18,5 @@ org.apache.eagle.app.example.ExampleApplicationProvider
org.apache.eagle.app.jpm.JPMWebApplicationProvider
org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider
org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider
-org.apache.eagle.jpm.aggregation.AggregationApplicationProvider
\ No newline at end of file
+org.apache.eagle.jpm.aggregation.AggregationApplicationProvider
+org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider