You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:06 UTC
[10/52] [abbrv] incubator-eagle git commit: [EAGLE-460] Convert MR
running app with new app framework
[EAGLE-460] Convert MR running app with new app framework
https://issues.apache.org/jira/browse/EAGLE-460
Author: Hao Chen <ha...@apache.org>
Closes #381 from haoch/EAGLE-460.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b54a63e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b54a63e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b54a63e0
Branch: refs/heads/master
Commit: b54a63e0497f6dbca6203fc3024ca649bc767f25
Parents: 0b852cb
Author: Hao Chen <ha...@apache.org>
Authored: Wed Aug 24 21:25:57 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Aug 24 21:25:57 2016 +0800
----------------------------------------------------------------------
.../queue/model/scheduler/SchedulerInfo.java | 86 ++++-----
.../queue/model/scheduler/SchedulerWrapper.java | 16 +-
.../hadoop/queue/model/scheduler/User.java | 74 ++++----
.../storm/HadoopQueueMetricPersistBolt.java | 31 ++--
.../storm/HadoopQueueRunningExtractor.java | 23 +--
.../apache/eagle/app/jpm/JPMApplication.java | 13 +-
eagle-jpm/eagle-jpm-mr-running/pom.xml | 5 +
.../jpm/mr/running/MRRunningJobApplication.java | 77 ++++++++
.../MRRunningJobApplicationProvider.java | 26 +++
.../jpm/mr/running/MRRunningJobConfig.java | 156 +++++++++++++++++
.../eagle/jpm/mr/running/MRRunningJobMain.java | 75 +-------
.../running/config/MRRunningConfigManager.java | 151 ----------------
.../parser/MRJobEntityCreationHandler.java | 6 +-
.../jpm/mr/running/parser/MRJobParser.java | 34 ++--
.../mr/running/recover/MRRunningJobManager.java | 8 +-
.../running/storm/MRRunningJobFetchSpout.java | 30 ++--
.../mr/running/storm/MRRunningJobParseBolt.java | 34 ++--
....running.MRRunningJobApplicationProvider.xml | 175 +++++++++++++++++++
...org.apache.eagle.app.spi.ApplicationProvider | 16 ++
.../src/main/resources/application.conf | 8 +-
.../MRRunningJobApplicationProviderTest.java | 35 ++++
.../mr/running/MRRunningJobApplicationTest.java | 27 +++
.../service/jpm/MRJobExecutionResource.java | 34 ++--
.../storm/SparkRunningJobFetchSpout.java | 14 +-
.../running/storm/SparkRunningJobParseBolt.java | 5 +-
25 files changed, 733 insertions(+), 426 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerInfo.java
index 8ed7745..8c51600 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerInfo.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerInfo.java
@@ -21,63 +21,63 @@ package org.apache.eagle.hadoop.queue.model.scheduler;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class SchedulerInfo {
- private String type;
- private double capacity;
- private double usedCapacity;
- private double maxCapacity;
- private String queueName;
- private Queues queues;
+ private String type;
+ private double capacity;
+ private double usedCapacity;
+ private double maxCapacity;
+ private String queueName;
+ private Queues queues;
- public Queues getQueues() {
- return queues;
- }
+ public Queues getQueues() {
+ return queues;
+ }
- public void setQueues(Queues queues) {
- this.queues = queues;
- }
+ public void setQueues(Queues queues) {
+ this.queues = queues;
+ }
- public double getUsedCapacity() {
- return usedCapacity;
- }
+ public double getUsedCapacity() {
+ return usedCapacity;
+ }
- public void setUsedCapacity(double usedCapacity) {
- this.usedCapacity = usedCapacity;
- }
+ public void setUsedCapacity(double usedCapacity) {
+ this.usedCapacity = usedCapacity;
+ }
- public String getType() {
- return type;
- }
+ public String getType() {
+ return type;
+ }
- public void setType(String type) {
- this.type = type;
- }
+ public void setType(String type) {
+ this.type = type;
+ }
- public double getCapacity() {
- return capacity;
- }
+ public double getCapacity() {
+ return capacity;
+ }
- public void setCapacity(double capacity) {
- this.capacity = capacity;
- }
+ public void setCapacity(double capacity) {
+ this.capacity = capacity;
+ }
- public double getMaxCapacity() {
- return maxCapacity;
- }
+ public double getMaxCapacity() {
+ return maxCapacity;
+ }
- public void setMaxCapacity(double maxCapacity) {
- this.maxCapacity = maxCapacity;
- }
+ public void setMaxCapacity(double maxCapacity) {
+ this.maxCapacity = maxCapacity;
+ }
- public String getQueueName() {
- return queueName;
- }
+ public String getQueueName() {
+ return queueName;
+ }
- public void setQueueName(String queueName) {
- this.queueName = queueName;
- }
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerWrapper.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerWrapper.java
index f181f2f..61b3685 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerWrapper.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerWrapper.java
@@ -21,16 +21,16 @@ package org.apache.eagle.hadoop.queue.model.scheduler;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class SchedulerWrapper {
- public Scheduler getScheduler() {
- return scheduler;
- }
+ public Scheduler getScheduler() {
+ return scheduler;
+ }
- public void setScheduler(Scheduler scheduler) {
- this.scheduler = scheduler;
- }
+ public void setScheduler(Scheduler scheduler) {
+ this.scheduler = scheduler;
+ }
- private Scheduler scheduler;
+ private Scheduler scheduler;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/User.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/User.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/User.java
index bac9eb2..7bddf71 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/User.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/User.java
@@ -21,43 +21,43 @@ package org.apache.eagle.hadoop.queue.model.scheduler;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class User {
- private String username;
- private ResourcesUsed resourcesUsed;
- private int numPendingApplications;
- private int numActiveApplications;
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public ResourcesUsed getResourcesUsed() {
- return resourcesUsed;
- }
-
- public void setResourcesUsed(ResourcesUsed resourcesUsed) {
- this.resourcesUsed = resourcesUsed;
- }
-
- public int getNumPendingApplications() {
- return numPendingApplications;
- }
-
- public void setNumPendingApplications(int numPendingApplications) {
- this.numPendingApplications = numPendingApplications;
- }
-
- public int getNumActiveApplications() {
- return numActiveApplications;
- }
-
- public void setNumActiveApplications(int numActiveApplications) {
- this.numActiveApplications = numActiveApplications;
- }
+ private String username;
+ private ResourcesUsed resourcesUsed;
+ private int numPendingApplications;
+ private int numActiveApplications;
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public ResourcesUsed getResourcesUsed() {
+ return resourcesUsed;
+ }
+
+ public void setResourcesUsed(ResourcesUsed resourcesUsed) {
+ this.resourcesUsed = resourcesUsed;
+ }
+
+ public int getNumPendingApplications() {
+ return numPendingApplications;
+ }
+
+ public void setNumPendingApplications(int numPendingApplications) {
+ this.numPendingApplications = numPendingApplications;
+ }
+
+ public int getNumActiveApplications() {
+ return numActiveApplications;
+ }
+
+ public void setNumActiveApplications(int numActiveApplications) {
+ this.numActiveApplications = numActiveApplications;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
index c99ea41..db61841 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
@@ -18,19 +18,20 @@
package org.apache.eagle.hadoop.queue.storm;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
-import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.EagleServiceConnector;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@ import java.util.Map;
public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
- private final static Logger LOG = LoggerFactory.getLogger(HadoopQueueMetricPersistBolt.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueMetricPersistBolt.class);
private Config config;
private IEagleServiceClient client;
@@ -65,7 +66,7 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.METRIC.toString())) {
List<GenericMetricEntity> metrics = (List<GenericMetricEntity>) data;
writeMetrics(metrics);
- } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) {
+ } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) {
List<RunningQueueAPIEntity> entities = (List<RunningQueueAPIEntity>) data;
writeEntities(entities);
}
@@ -77,12 +78,12 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
}
- private void writeEntities(List<RunningQueueAPIEntity> entities){
+ private void writeEntities(List<RunningQueueAPIEntity> entities) {
try {
GenericServiceAPIResponseEntity response = client.create(entities);
- if(!response.isSuccess()){
+ if (!response.isSuccess()) {
LOG.error("Got exception from eagle service: " + response.getException());
- }else{
+ } else {
LOG.info("Successfully wrote " + entities.size() + " RunningQueueAPIEntity entities");
}
} catch (Exception e) {
@@ -91,12 +92,12 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
entities.clear();
}
- private void writeMetrics(List<GenericMetricEntity> entities){
+ private void writeMetrics(List<GenericMetricEntity> entities) {
try {
GenericServiceAPIResponseEntity response = client.create(entities);
- if(response.isSuccess()){
+ if (response.isSuccess()) {
LOG.info("Successfully wrote " + entities.size() + " GenericMetricEntity entities");
- }else{
+ } else {
LOG.error(response.getException());
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
index 3c4391b..ef0c762 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
@@ -18,16 +18,17 @@
package org.apache.eagle.hadoop.queue.storm;
-import backtype.storm.spout.SpoutOutputCollector;
-import com.typesafe.config.Config;
-import org.apache.eagle.hadoop.queue.crawler.ClusterMetricsCrawler;
-import org.apache.eagle.hadoop.queue.crawler.RunningAppsCrawler;
-import org.apache.eagle.hadoop.queue.crawler.SchedulerInfoCrawler;
import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils;
import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
import org.apache.eagle.hadoop.queue.common.YarnURLSelectorImpl;
+import org.apache.eagle.hadoop.queue.crawler.ClusterMetricsCrawler;
+import org.apache.eagle.hadoop.queue.crawler.RunningAppsCrawler;
+import org.apache.eagle.hadoop.queue.crawler.SchedulerInfoCrawler;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelector;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,10 +39,10 @@ import java.util.concurrent.*;
public class HadoopQueueRunningExtractor {
- private final static Logger LOGGER = LoggerFactory.getLogger(HadoopQueueRunningExtractor.class);
- private final static int MAX_NUM_THREADS = 10;
- private final static int MAX_WAIT_TIME = 10;
- private final static String DEFAULT_SITE = "sandbox";
+ private static final Logger LOGGER = LoggerFactory.getLogger(HadoopQueueRunningExtractor.class);
+ private static final int MAX_NUM_THREADS = 10;
+ private static final int MAX_WAIT_TIME = 10;
+ private static final String DEFAULT_SITE = "sandbox";
private String site;
private String urlBases;
@@ -53,10 +54,10 @@ public class HadoopQueueRunningExtractor {
public HadoopQueueRunningExtractor(Config eagleConf, SpoutOutputCollector collector) {
site = HadoopYarnResourceUtils.getConfigValue(eagleConf, "eagleProps.site", DEFAULT_SITE);
urlBases = HadoopYarnResourceUtils.getConfigValue(eagleConf, "dataSourceConfig.RMEndPoints", "");
- if(urlBases == null){
+ if (urlBases == null) {
throw new IllegalArgumentException(site + ".baseurl is null");
}
- String [] urls = urlBases.split(",");
+ String[] urls = urlBases.split(",");
urlSelector = new YarnURLSelectorImpl(urls, Constants.CompressionType.GZIP);
executorService = Executors.newFixedThreadPool(MAX_NUM_THREADS);
this.collector = collector;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java b/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
index d084008..68b7eff 100644
--- a/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
+++ b/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
@@ -16,6 +16,9 @@
*/
package org.apache.eagle.app.jpm;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
@@ -24,8 +27,6 @@ import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import com.typesafe.config.Config;
-import org.apache.eagle.app.StormApplication;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
import java.util.Arrays;
import java.util.Map;
@@ -41,16 +42,16 @@ public class JPMApplication extends StormApplication {
}
private class RandomEventSpout extends BaseRichSpout {
- private SpoutOutputCollector _collector;
+ private SpoutOutputCollector collector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
- _collector = spoutOutputCollector;
+ collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
- _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
- _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
+ collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+ collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml
index 414a221..d4ca4ce 100644
--- a/eagle-jpm/eagle-jpm-mr-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml
@@ -80,6 +80,11 @@
<artifactId>eagle-jpm-entity</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-app-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
<resources>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
new file mode 100644
index 0000000..21ee1d9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.running;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
+import org.apache.eagle.jpm.util.Constants;
+
+import java.util.List;
+
+public class MRRunningJobApplication extends StormApplication {
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ //1. trigger init conf
+ MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.getInstance(config);
+
+ List<String> confKeyKeys = mrRunningJobConfig.getConfig().getStringList("MRConfigureKeys.jobConfigKey");
+ confKeyKeys.add(Constants.JobConfiguration.CASCADING_JOB);
+ confKeyKeys.add(Constants.JobConfiguration.HIVE_JOB);
+ confKeyKeys.add(Constants.JobConfiguration.PIG_JOB);
+ confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB);
+ confKeyKeys.add(0, mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobNameKey"));
+
+ //2. init topology
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ String spoutName = "mrRunningJobFetchSpout";
+ String boltName = "mrRunningJobParseBolt";
+ int parallelism = mrRunningJobConfig.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
+ int tasks = mrRunningJobConfig.getConfig().getInt("envContextConfig.tasks." + spoutName);
+ if (parallelism > tasks) {
+ parallelism = tasks;
+ }
+ topologyBuilder.setSpout(
+ spoutName,
+ new MRRunningJobFetchSpout(
+ mrRunningJobConfig.getJobExtractorConfig(),
+ mrRunningJobConfig.getEndpointConfig(),
+ mrRunningJobConfig.getZkStateConfig()),
+ parallelism
+ ).setNumTasks(tasks);
+
+ parallelism = mrRunningJobConfig.getConfig().getInt("envContextConfig.parallelismConfig." + boltName);
+ tasks = mrRunningJobConfig.getConfig().getInt("envContextConfig.tasks." + boltName);
+ if (parallelism > tasks) {
+ parallelism = tasks;
+ }
+ topologyBuilder.setBolt(boltName,
+ new MRRunningJobParseBolt(
+ mrRunningJobConfig.getEagleServiceConfig(),
+ mrRunningJobConfig.getEndpointConfig(),
+ mrRunningJobConfig.getJobExtractorConfig(),
+ mrRunningJobConfig.getZkStateConfig(),
+ confKeyKeys),
+ parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
+ return topologyBuilder.createTopology();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
new file mode 100644
index 0000000..45a841b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.running;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+public class MRRunningJobApplicationProvider extends AbstractApplicationProvider<MRRunningJobApplication> {
+ @Override
+ public MRRunningJobApplication getApplication() {
+ return new MRRunningJobApplication();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
new file mode 100644
index 0000000..ec6740b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running;
+
+import org.apache.eagle.common.config.ConfigOptionParser;
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class MRRunningJobConfig implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(MRRunningJobConfig.class);
+
+ public String getEnv() {
+ return env;
+ }
+
+ private String env;
+
+ public ZKStateConfig getZkStateConfig() {
+ return zkStateConfig;
+ }
+
+ private ZKStateConfig zkStateConfig;
+
+ public EagleServiceConfig getEagleServiceConfig() {
+ return eagleServiceConfig;
+ }
+
+ private EagleServiceConfig eagleServiceConfig;
+
+ public JobExtractorConfig getJobExtractorConfig() {
+ return jobExtractorConfig;
+ }
+
+ private JobExtractorConfig jobExtractorConfig;
+
+ public EndpointConfig getEndpointConfig() {
+ return endpointConfig;
+ }
+
+ private EndpointConfig endpointConfig;
+
+ public static class ZKStateConfig implements Serializable {
+ public String zkQuorum;
+ public String zkRoot;
+ public int zkSessionTimeoutMs;
+ public int zkRetryTimes;
+ public int zkRetryInterval;
+ public String zkPort;
+ }
+
+ public static class EagleServiceConfig implements Serializable {
+ public String eagleServiceHost;
+ public int eagleServicePort;
+ public int readTimeoutSeconds;
+ public int maxFlushNum;
+ public String username;
+ public String password;
+ }
+
+ public static class JobExtractorConfig implements Serializable {
+ public String site;
+ public int fetchRunningJobInterval;
+ public int parseJobThreadPoolSize;
+ public int topAndBottomTaskByElapsedTime;
+ }
+
+ public static class EndpointConfig implements Serializable {
+ public String[] rmUrls;
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+
+ private Config config;
+
+ private static MRRunningJobConfig manager = new MRRunningJobConfig();
+
+ private MRRunningJobConfig() {
+ this.eagleServiceConfig = new EagleServiceConfig();
+ this.jobExtractorConfig = new JobExtractorConfig();
+ this.endpointConfig = new EndpointConfig();
+ this.zkStateConfig = new ZKStateConfig();
+ }
+
+ public static MRRunningJobConfig getInstance(String[] args) {
+ try {
+ LOG.info("Loading from configuration file");
+ return getInstance(new ConfigOptionParser().load(args));
+ } catch (Exception e) {
+ LOG.error("failed to load config");
+ throw new IllegalArgumentException("Failed to load config", e);
+ }
+ }
+
+ public static MRRunningJobConfig getInstance(Config config) {
+ manager.init(config);
+ return manager;
+ }
+
+ private void init(Config config) {
+ this.config = config;
+ this.env = config.getString("envContextConfig.env");
+
+ //parse eagle zk
+ this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
+ this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
+ this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
+ this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
+ this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
+ this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
+
+ // parse eagle service endpoint
+ this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+ String port = config.getString("eagleProps.eagleService.port");
+ this.eagleServiceConfig.eagleServicePort = Integer.parseInt(port);
+ this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
+ this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
+ this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
+ this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
+ //parse job extractor
+ this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
+ this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
+ this.jobExtractorConfig.parseJobThreadPoolSize = config.getInt("jobExtractorConfig.parseJobThreadPoolSize");
+ this.jobExtractorConfig.topAndBottomTaskByElapsedTime = config.getInt("jobExtractorConfig.topAndBottomTaskByElapsedTime");
+
+ //parse data source config
+ this.endpointConfig.rmUrls = config.getString("dataSourceConfig.rmUrls").split(",");
+
+ LOG.info("Successfully initialized MRRunningJobConfig");
+ LOG.info("env: " + this.env);
+ LOG.info("site: " + this.jobExtractorConfig.site);
+ LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
+ LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
index 87079fd..a3d6d74 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
@@ -18,79 +18,8 @@
package org.apache.eagle.jpm.mr.running;
-
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
-import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
-import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
-import org.apache.eagle.jpm.util.Constants;
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import java.util.List;
-
public class MRRunningJobMain {
public static void main(String[] args) {
-
- try {
- //1. trigger init conf
- MRRunningConfigManager mrRunningConfigManager = MRRunningConfigManager.getInstance(args);
-
- List<String> confKeyKeys = mrRunningConfigManager.getConfig().getStringList("MRConfigureKeys.jobConfigKey");
- confKeyKeys.add(Constants.JobConfiguration.CASCADING_JOB);
- confKeyKeys.add(Constants.JobConfiguration.HIVE_JOB);
- confKeyKeys.add(Constants.JobConfiguration.PIG_JOB);
- confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB);
- confKeyKeys.add(0, mrRunningConfigManager.getConfig().getString("MRConfigureKeys.jobNameKey"));
-
- //2. init topology
- TopologyBuilder topologyBuilder = new TopologyBuilder();
- String topologyName = mrRunningConfigManager.getConfig().getString("envContextConfig.topologyName");
- String spoutName = "mrRunningJobFetchSpout";
- String boltName = "mrRunningJobParseBolt";
- int parallelism = mrRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
- int tasks = mrRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName);
- if (parallelism > tasks) {
- parallelism = tasks;
- }
- topologyBuilder.setSpout(
- spoutName,
- new MRRunningJobFetchSpout(
- mrRunningConfigManager.getJobExtractorConfig(),
- mrRunningConfigManager.getEndpointConfig(),
- mrRunningConfigManager.getZkStateConfig()),
- parallelism
- ).setNumTasks(tasks);
-
- parallelism = mrRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + boltName);
- tasks = mrRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + boltName);
- if (parallelism > tasks) {
- parallelism = tasks;
- }
- topologyBuilder.setBolt(boltName,
- new MRRunningJobParseBolt(
- mrRunningConfigManager.getEagleServiceConfig(),
- mrRunningConfigManager.getEndpointConfig(),
- mrRunningConfigManager.getJobExtractorConfig(),
- mrRunningConfigManager.getZkStateConfig(),
- confKeyKeys),
- parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
-
- backtype.storm.Config config = new backtype.storm.Config();
- config.setNumWorkers(mrRunningConfigManager.getConfig().getInt("envContextConfig.workers"));
- config.put(Config.TOPOLOGY_DEBUG, true);
- if (!mrRunningConfigManager.getEnv().equals("local")) {
- //cluster mode
- //parse conf here
- StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology());
- } else {
- //local mode
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
+ new MRRunningJobApplication().run(args);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
deleted file mode 100644
index 42426e4..0000000
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.running.config;
-
-import org.apache.eagle.common.config.ConfigOptionParser;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-
-public class MRRunningConfigManager implements Serializable {
- private static final Logger LOG = LoggerFactory.getLogger(MRRunningConfigManager.class);
-
- public String getEnv() {
- return env;
- }
-
- private String env;
-
- public ZKStateConfig getZkStateConfig() {
- return zkStateConfig;
- }
-
- private ZKStateConfig zkStateConfig;
-
- public EagleServiceConfig getEagleServiceConfig() {
- return eagleServiceConfig;
- }
-
- private EagleServiceConfig eagleServiceConfig;
-
- public JobExtractorConfig getJobExtractorConfig() {
- return jobExtractorConfig;
- }
-
- private JobExtractorConfig jobExtractorConfig;
-
- public EndpointConfig getEndpointConfig() {
- return endpointConfig;
- }
-
- private EndpointConfig endpointConfig;
-
- public static class ZKStateConfig implements Serializable {
- public String zkQuorum;
- public String zkRoot;
- public int zkSessionTimeoutMs;
- public int zkRetryTimes;
- public int zkRetryInterval;
- public String zkPort;
- }
-
- public static class EagleServiceConfig implements Serializable {
- public String eagleServiceHost;
- public int eagleServicePort;
- public int readTimeoutSeconds;
- public int maxFlushNum;
- public String username;
- public String password;
- }
-
- public static class JobExtractorConfig implements Serializable {
- public String site;
- public int fetchRunningJobInterval;
- public int parseJobThreadPoolSize;
- public int topAndBottomTaskByElapsedTime;
- }
-
- public static class EndpointConfig implements Serializable {
- public String[] rmUrls;
- }
-
- public Config getConfig() {
- return config;
- }
-
- private Config config;
-
- private static MRRunningConfigManager manager = new MRRunningConfigManager();
-
- private MRRunningConfigManager() {
- this.eagleServiceConfig = new EagleServiceConfig();
- this.jobExtractorConfig = new JobExtractorConfig();
- this.endpointConfig = new EndpointConfig();
- this.zkStateConfig = new ZKStateConfig();
- }
-
- public static MRRunningConfigManager getInstance(String[] args) {
- manager.init(args);
- return manager;
- }
-
- private void init(String[] args) {
- try {
- LOG.info("Loading from configuration file");
- this.config = new ConfigOptionParser().load(args);
- } catch (Exception e) {
- LOG.error("failed to load config");
- }
-
- this.env = config.getString("envContextConfig.env");
-
- //parse eagle zk
- this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
- this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
- this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
- this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
- this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
- this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
-
- // parse eagle service endpoint
- this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
- String port = config.getString("eagleProps.eagleService.port");
- this.eagleServiceConfig.eagleServicePort = Integer.parseInt(port);
- this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
- this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
- this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
- this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
- //parse job extractor
- this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
- this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
- this.jobExtractorConfig.parseJobThreadPoolSize = config.getInt("jobExtractorConfig.parseJobThreadPoolSize");
- this.jobExtractorConfig.topAndBottomTaskByElapsedTime = config.getInt("jobExtractorConfig.topAndBottomTaskByElapsedTime");
-
- //parse data source config
- this.endpointConfig.rmUrls = config.getStringList("dataSourceConfig.rmUrls").toArray(new String[0]);
-
- LOG.info("Successfully initialized MRRunningConfigManager");
- LOG.info("env: " + this.env);
- LOG.info("site: " + this.jobExtractorConfig.site);
- LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
- LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
index a43ed16..1a0fb61 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.mr.running.parser;
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
import org.apache.eagle.jpm.mr.running.parser.metrics.JobExecutionMetricsCreationListener;
import org.apache.eagle.jpm.mr.running.parser.metrics.TaskExecutionMetricsCreationListener;
import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
@@ -37,11 +37,11 @@ public class MRJobEntityCreationHandler {
private static final Logger LOG = LoggerFactory.getLogger(MRJobEntityCreationHandler.class);
private List<TaggedLogAPIEntity> entities = new ArrayList<>();
- private MRRunningConfigManager.EagleServiceConfig eagleServiceConfig;
+ private MRRunningJobConfig.EagleServiceConfig eagleServiceConfig;
private JobExecutionMetricsCreationListener jobMetricsListener;
private TaskExecutionMetricsCreationListener taskMetricsListener;
- public MRJobEntityCreationHandler(MRRunningConfigManager.EagleServiceConfig eagleServiceConfig) {
+ public MRJobEntityCreationHandler(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig) {
this.eagleServiceConfig = eagleServiceConfig;
jobMetricsListener = new JobExecutionMetricsCreationListener();
taskMetricsListener = new TaskExecutionMetricsCreationListener();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 3b31d93..9148c0c 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.mr.running.parser;
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
import org.apache.eagle.jpm.mr.runningentity.JobConfig;
import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
@@ -77,14 +77,14 @@ public class MRJobParser implements Runnable {
private boolean first;
private Set<String> finishedTaskIds;
private List<String> configKeys;
- private MRRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+ private MRRunningJobConfig.JobExtractorConfig jobExtractorConfig;
static {
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
}
- public MRJobParser(MRRunningConfigManager.JobExtractorConfig jobExtractorConfig,
- MRRunningConfigManager.EagleServiceConfig eagleServiceConfig,
+ public MRJobParser(MRRunningJobConfig.JobExtractorConfig jobExtractorConfig,
+ MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
AppInfo app, Map<String, JobExecutionAPIEntity> mrJobMap,
MRRunningJobManager runningJobManager, ResourceFetcher rmResourceFetcher,
List<String> configKeys) {
@@ -393,8 +393,8 @@ public class MRJobParser implements Runnable {
Comparator<MRTask> byElapsedTimeDecrease = (e1, e2) -> -1 * Long.compare(e1.getElapsedTime(), e2.getElapsedTime());
//2, get finished bottom n
Iterator<MRTask> taskIteratorIncrease = tasks.stream()
- .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
- .sorted(byElapsedTimeIncrease).iterator();
+ .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
+ .sorted(byElapsedTimeIncrease).iterator();
int i = 0;
while (taskIteratorIncrease.hasNext() && i < jobExtractorConfig.topAndBottomTaskByElapsedTime) {
MRTask mrTask = taskIteratorIncrease.next();
@@ -405,8 +405,8 @@ public class MRJobParser implements Runnable {
}
//3, fetch finished top n
Iterator<MRTask> taskIteratorDecrease = tasks.stream()
- .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
- .sorted(byElapsedTimeDecrease).iterator();
+ .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
+ .sorted(byElapsedTimeDecrease).iterator();
i = 0;
while (taskIteratorDecrease.hasNext() && i < jobExtractorConfig.topAndBottomTaskByElapsedTime) {
MRTask mrTask = taskIteratorDecrease.next();
@@ -417,8 +417,8 @@ public class MRJobParser implements Runnable {
}
//4, fetch running top n
taskIteratorDecrease = tasks.stream()
- .filter(task -> task.getState().equals(Constants.TaskState.RUNNING.toString()))
- .sorted(byElapsedTimeDecrease).iterator();
+ .filter(task -> task.getState().equals(Constants.TaskState.RUNNING.toString()))
+ .sorted(byElapsedTimeDecrease).iterator();
i = 0;
while (taskIteratorDecrease.hasNext() && i < jobExtractorConfig.topAndBottomTaskByElapsedTime) {
MRTask mrTask = taskIteratorDecrease.next();
@@ -560,12 +560,12 @@ public class MRJobParser implements Runnable {
//we must flush entities before delete from zk in case of missing finish state of jobs
//delete from zk if needed
mrJobEntityMap.keySet()
- .stream()
- .filter(
- jobId -> mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FINISHED.toString())
- || mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FAILED.toString()))
- .forEach(
- jobId -> this.runningJobManager.delete(app.getId(), jobId));
+ .stream()
+ .filter(
+ jobId -> mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FINISHED.toString())
+ || mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FAILED.toString()))
+ .forEach(
+ jobId -> this.runningJobManager.delete(app.getId(), jobId));
}
LOG.info("finish process yarn application " + app.getId());
@@ -575,4 +575,4 @@ public class MRJobParser implements Runnable {
}
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
index 75650b7..50b4726 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
@@ -18,11 +18,13 @@
package org.apache.eagle.jpm.mr.running.recover;
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
-import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@@ -30,7 +32,7 @@ import java.util.Map;
public class MRRunningJobManager implements Serializable {
private RunningJobManager runningJobManager;
- public MRRunningJobManager(MRRunningConfigManager.ZKStateConfig config) {
+ public MRRunningJobManager(MRRunningJobConfig.ZKStateConfig config) {
this.runningJobManager = new RunningJobManager(config.zkQuorum,
config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
index ebb9144..268912c 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
@@ -18,13 +18,7 @@
package org.apache.eagle.jpm.mr.running.storm;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
import org.apache.eagle.jpm.util.Constants;
@@ -32,6 +26,13 @@ import org.apache.eagle.jpm.util.Utils;
import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,17 +41,18 @@ import java.util.*;
public class MRRunningJobFetchSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(MRRunningJobFetchSpout.class);
- private MRRunningConfigManager.JobExtractorConfig jobExtractorConfig;
- private MRRunningConfigManager.EndpointConfig endpointConfig;
- private MRRunningConfigManager.ZKStateConfig zkStateConfig;
+ private MRRunningJobConfig.JobExtractorConfig jobExtractorConfig;
+ private MRRunningJobConfig.EndpointConfig endpointConfig;
+ private MRRunningJobConfig.ZKStateConfig zkStateConfig;
private ResourceFetcher resourceFetcher;
private SpoutOutputCollector collector;
private boolean init;
private transient MRRunningJobManager runningJobManager;
private Set<String> runningYarnApps;
- public MRRunningJobFetchSpout(MRRunningConfigManager.JobExtractorConfig jobExtractorConfig,
- MRRunningConfigManager.EndpointConfig endpointConfig,
- MRRunningConfigManager.ZKStateConfig zkStateConfig) {
+
+ public MRRunningJobFetchSpout(MRRunningJobConfig.JobExtractorConfig jobExtractorConfig,
+ MRRunningJobConfig.EndpointConfig endpointConfig,
+ MRRunningJobConfig.ZKStateConfig zkStateConfig) {
this.jobExtractorConfig = jobExtractorConfig;
this.endpointConfig = endpointConfig;
this.zkStateConfig = zkStateConfig;
@@ -168,4 +170,4 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
@Override
public void close() {
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index 0dccd70..3174eb1 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -18,12 +18,7 @@
package org.apache.eagle.jpm.mr.running.storm;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
import org.apache.eagle.jpm.mr.running.parser.MRJobParser;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
@@ -31,6 +26,12 @@ import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.topology.base.BaseRichBolt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,19 +42,20 @@ import java.util.concurrent.Executors;
public class MRRunningJobParseBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(MRRunningJobParseBolt.class);
- private MRRunningConfigManager.EndpointConfig endpointConfig;
- private MRRunningConfigManager.JobExtractorConfig jobExtractorConfig;
- private MRRunningConfigManager.ZKStateConfig zkStateConfig;
+ private MRRunningJobConfig.EndpointConfig endpointConfig;
+ private MRRunningJobConfig.JobExtractorConfig jobExtractorConfig;
+ private MRRunningJobConfig.ZKStateConfig zkStateConfig;
private ExecutorService executorService;
private Map<String, MRJobParser> runningMRParsers;
private transient MRRunningJobManager runningJobManager;
- private MRRunningConfigManager.EagleServiceConfig eagleServiceConfig;
+ private MRRunningJobConfig.EagleServiceConfig eagleServiceConfig;
private ResourceFetcher resourceFetcher;
private List<String> configKeys;
- public MRRunningJobParseBolt(MRRunningConfigManager.EagleServiceConfig eagleServiceConfig,
- MRRunningConfigManager.EndpointConfig endpointConfig,
- MRRunningConfigManager.JobExtractorConfig jobExtractorConfig,
- MRRunningConfigManager.ZKStateConfig zkStateConfig,
+
+ public MRRunningJobParseBolt(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
+ MRRunningJobConfig.EndpointConfig endpointConfig,
+ MRRunningJobConfig.JobExtractorConfig jobExtractorConfig,
+ MRRunningJobConfig.ZKStateConfig zkStateConfig,
List<String> configKeys) {
this.eagleServiceConfig = eagleServiceConfig;
this.endpointConfig = endpointConfig;
@@ -96,8 +98,8 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
LOG.info("remove parser {}", appId);
});
- if (appInfo.getState().equals(Constants.AppState.FINISHED.toString()) ||
- applicationParser.status() == MRJobParser.ParserStatus.FINISHED) {
+ if (appInfo.getState().equals(Constants.AppState.FINISHED.toString())
+ || applicationParser.status() == MRJobParser.ParserStatus.FINISHED) {
applicationParser.setStatus(MRJobParser.ParserStatus.RUNNING);
executorService.execute(applicationParser);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
new file mode 100644
index 0000000..5f809f3
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<application>
+ <type>MR_RUNNING_JOB_APP</type>
+ <name>MR Running Job Monitoring</name>
+ <version>0.5.0-incubating</version>
+ <appClass>org.apache.eagle.jpm.mr.running.MRRunningJobApplication</appClass>
+ <viewPath>/apps/jpm</viewPath>
+ <configuration>
+ <!-- org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig -->
+ <property>
+ <name>envContextConfig.env</name>
+ <value>local</value>
+ <displayName>Environment</displayName>
+ <description>Execution environment</description>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkQuorum</name>
+ <displayName>zkQuorum</displayName>
+ <description>Zookeeper Quorum</description>
+ <value>sandbox.hortonworks.com:2181</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkPort</name>
+ <displayName>zkPort</displayName>
+ <description>Zookeeper Port</description>
+ <value>2181</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkSessionTimeoutMs</name>
+ <displayName>zkSessionTimeoutMs</displayName>
+ <description>Zookeeper session timeoutMs</description>
+ <value>15000</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkRetryTimes</name>
+ <displayName>zkRetryTimes</displayName>
+ <description>zookeeperConfig.zkRetryTimes</description>
+ <value>3</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkRetryInterval</name>
+ <displayName>zkRetryInterval</displayName>
+ <description>zookeeperConfig.zkRetryInterval</description>
+ <value>20000</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkRoot</name>
+ <value>/apps/mr/running</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.host</name>
+ <description>eagleProps.eagleService.host</description>
+ <value>sandbox.hortonworks.com</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.port</name>
+ <description>eagleProps.eagleService.port</description>
+ <value>9099</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.username</name>
+ <description>eagleProps.eagleService.username</description>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.password</name>
+ <description>eagleProps.eagleService.password</description>
+ <value>secret</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.readTimeOutSeconds</name>
+ <description>eagleProps.eagleService.readTimeOutSeconds</description>
+ <value>20</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.maxFlushNum</name>
+ <description>eagleProps.eagleService.maxFlushNum</description>
+ <value>500</value>
+ </property>
+ <property>
+ <name>jobExtractorConfig.site</name>
+ <description>jobExtractorConfig.site</description>
+ <value>sandbox</value>
+ </property>
+ <property>
+ <name>jobExtractorConfig.fetchRunningJobInterval</name>
+ <description>jobExtractorConfig.fetchRunningJobInterval</description>
+ <value>15</value>
+ </property>
+ <property>
+ <name>jobExtractorConfig.parseThreadPoolSize</name>
+ <description>jobExtractorConfig.parseThreadPoolSize</description>
+ <value>5</value>
+ </property>
+ <property>
+ <name>jobExtractorConfig.topAndBottomTaskByElapsedTime</name>
+ <description>jobExtractorConfig.topAndBottomTaskByElapsedTime</description>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.rmUrls</name>
+ <description>dataSourceConfig.rmUrls</description>
+ <value>http://sandbox.hortonworks.com:50030</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.nnEndpoint</name>
+ <description>dataSourceConfig.nnEndpoint</description>
+ <value>hdfs://sandbox.hortonworks.com:8020</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.keytab</name>
+ <description>dataSourceConfig.keytab</description>
+ <value></value>
+ </property>
+ <property>
+ <name>dataSourceConfig.principal</name>
+ <description>dataSourceConfig.principal</description>
+ <value></value>
+ </property>
+ <property>
+ <name>dataSourceConfig.rmUrls</name>
+ <description>dataSourceConfig.rmUrls</description>
+ <value>http://sandbox.hortonworks.com:8088</value>
+ </property>
+ </configuration>
+ <docs>
+ <install>
+ # Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+ ./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+ # Step 2: Set up data collector to flow data into kafka topic in
+
+ ./bin/logstash -f log_collector.conf
+
+ ## `log_collector.conf` sample as following:
+
+ input {
+
+ }
+ filter {
+
+ }
+ output{
+
+ }
+
+ # Step 3: start application
+
+ # Step 4: monitor with featured portal or alert with policies
+ </install>
+ <uninstall>
+ # Step 1: stop and uninstall application
+ # Step 2: delete kafka topic named "${site}_example_source_topic"
+ # Step 3: stop logstash
+ </uninstall>
+ </docs>
+</application>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..61ec08c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
index f15fc2d..4b6d4fe 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
@@ -14,10 +14,10 @@
# limitations under the License.
{
+ "appId":"mrRunningJob",
+ "mode":"LOCAL",
"envContextConfig" : {
"env" : "local",
- "topologyName" : "mrRunningJob",
- "stormConfigFile" : "storm.yaml",
"parallelismConfig" : {
"mrRunningJobFetchSpout" : 1,
"mrRunningJobParseBolt" : 10
@@ -44,11 +44,9 @@
"zkRetryTimes" : 3,
"zkRetryInterval" : 20000
},
-
"dataSourceConfig" : {
- "rmUrls": ["http://sandbox.hortonworks.com:50030"]
+ "rmUrls": "http://sandbox.hortonworks.com:50030"
},
-
"eagleProps" : {
"mailHost" : "abc.com",
"mailDebug" : "true",
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProviderTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProviderTest.java
new file mode 100644
index 0000000..2b7d429
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProviderTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.running;
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.test.AppJUnitRunner;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(AppJUnitRunner.class)
+public class MRRunningJobApplicationProviderTest {
+ @Inject
+ private
+ ApplicationSimulator simulator;
+
+ @Test
+ public void testStartAsManagedApplication(){
+ simulator.start(MRRunningJobApplicationProvider.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
new file mode 100644
index 0000000..3ec9089
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.running;
+
+import com.typesafe.config.ConfigFactory;
+import org.junit.Test;
+
+public class MRRunningJobApplicationTest {
+ @Test
+ public void testRunApplicationWithCLI(){
+ new MRRunningJobApplication().run(ConfigFactory.load());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
index be90456..3e487ae 100644
--- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
@@ -18,28 +18,30 @@
package org.apache.eagle.service.jpm;
-import org.apache.commons.lang.time.StopWatch;
+
+import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID;
+
import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.generic.GenericEntityServiceResource;
+
+import org.apache.commons.lang.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.*;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
-import java.util.*;
-
-import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID;
@Path("mrJobs")
public class MRJobExecutionResource {
GenericEntityServiceResource resource = new GenericEntityServiceResource();
- public final static String ELAPSEDMS = "elapsedms";
- public final static String TOTAL_RESULTS = "totalResults";
+ public static final String ELAPSEDMS = "elapsedms";
+ public static final String TOTAL_RESULTS = "totalResults";
- private final static Logger LOG = LoggerFactory.getLogger(MRJobExecutionResource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MRJobExecutionResource.class);
@GET
@Produces(MediaType.APPLICATION_JSON)
@@ -57,7 +59,7 @@ public class MRJobExecutionResource {
List<TaggedLogAPIEntity> jobs = new ArrayList<>();
List<TaggedLogAPIEntity> finishedJobs = new ArrayList<>();
Set<String> jobIds = new HashSet<>();
- Map<String,Object> meta = new HashMap<>();
+ final Map<String,Object> meta = new HashMap<>();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
@@ -134,7 +136,7 @@ public class MRJobExecutionResource {
List<TaggedLogAPIEntity> jobs = new ArrayList<>();
Set<String> jobIds = new HashSet<>();
String condition = buildCondition(jobId, jobDefId, site);
- int pageSize = Integer.MAX_VALUE;
+ final int pageSize = Integer.MAX_VALUE;
if (condition == null) {
response.setException(new Exception("Search condition is empty"));
response.setSuccess(false);
@@ -142,7 +144,7 @@ public class MRJobExecutionResource {
}
LOG.debug("search condition=" + condition);
- Map<String,Object> meta = new HashMap<>();
+ final Map<String,Object> meta = new HashMap<>();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
String queryFormat = "%s[%s]{*}";
@@ -245,11 +247,11 @@ public class MRJobExecutionResource {
List<Long> times = parseTimeList(timeList);
String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, jobId);
- GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> history_res =
+ GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> historyRes =
resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
- if (history_res.isSuccess() && history_res.getObj() != null && history_res.getObj().size() > 0) {
+ if (historyRes.isSuccess() && historyRes.getObj() != null && historyRes.getObj().size() > 0) {
initTaskCountList(runningTaskCount, finishedTaskCount, times, new HistoryTaskComparator());
- for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o : history_res.getObj()) {
+ for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o : historyRes.getObj()) {
int index = getPosition(times, o.getDuration());
MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index);
counter.taskCount++;
@@ -257,11 +259,11 @@ public class MRJobExecutionResource {
}
} else {
query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME, site, jobId);
- GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> running_res =
+ GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> runningRes =
resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
- if (running_res.isSuccess() && running_res.getObj() != null) {
+ if (runningRes.isSuccess() && runningRes.getObj() != null) {
initTaskCountList(runningTaskCount, finishedTaskCount, times, new RunningTaskComparator());
- for (TaskExecutionAPIEntity o : running_res.getObj()) {
+ for (TaskExecutionAPIEntity o : runningRes.getObj()) {
int index = getPosition(times, o.getDuration());
if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) {
MRJobTaskGroupResponse.UnitTaskCount counter = runningTaskCount.get(index);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
index ccdfe79..7162bac 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
@@ -18,12 +18,6 @@
package org.apache.eagle.jpm.spark.running.storm;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
@@ -31,6 +25,13 @@ import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,6 +144,7 @@ public class SparkRunningJobFetchSpout extends BaseRichSpout {
try {
Thread.sleep(jobExtractorConfig.fetchRunningJobInterval * 1000);
} catch (Exception e) {
+ // ignored
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
index c41804b..a497e29 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
@@ -53,6 +53,7 @@ public class SparkRunningJobParseBolt extends BaseRichBolt {
private ExecutorService executorService;
private Map<String, SparkApplicationParser> runningSparkParsers;
private ResourceFetcher resourceFetcher;
+
public SparkRunningJobParseBolt(SparkRunningJobAppConfig.ZKStateConfig zkStateConfig,
SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig,
SparkRunningJobAppConfig.EndpointConfig endpointConfig,
@@ -94,8 +95,8 @@ public class SparkRunningJobParseBolt extends BaseRichBolt {
LOG.info("remove parser {}", appId);
});
- if (appInfo.getState().equals(Constants.AppState.FINISHED.toString()) ||
- applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) {
+ if (appInfo.getState().equals(Constants.AppState.FINISHED.toString())
+ || applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) {
applicationParser.setStatus(SparkApplicationParser.ParserStatus.RUNNING);
executorService.execute(applicationParser);
}