You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/08/09 05:25:35 UTC
[7/8] incubator-eagle git commit: [EAGLE-422] eagle support for mr &
spark running job monitoring
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java
index 2684899..e0ec330 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java
@@ -18,8 +18,8 @@
package org.apache.eagle.jpm.mr.history.entities;
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -27,12 +27,12 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@Table("eaglejpa")
@ColumnFamily("f")
@Prefix("jexec")
-@Service(JPAConstants.JPA_JOB_EXECUTION_SERVICE_NAME)
+@Service(Constants.JPA_JOB_EXECUTION_SERVICE_NAME)
@TimeSeries(true)
@Partition({"site"})
@Indexes({
- @Index(name="Index_1_jobId", columns = { "jobID" }, unique = true),
- @Index(name="Index_2_normJobName", columns = { "normJobName" }, unique = false)
+ @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true),
+ @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
})
public class JobExecutionAPIEntity extends JobBaseAPIEntity {
@Column("a")
@@ -55,6 +55,22 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
private int numFinishedReduces;
@Column("j")
private JobCounters jobCounters;
+ @Column("k")
+ private int dataLocalMaps;
+ @Column("l")
+ private double dataLocalMapsPercentage;
+ @Column("m")
+ private int rackLocalMaps;
+ @Column("n")
+ private double rackLocalMapsPercentage;
+ @Column("o")
+ private int totalLaunchedMaps;
+ @Column("p")
+ private long submissionTime;
+ @Column("q")
+ private long lastMapDuration;
+ @Column("r")
+ private long lastReduceDuration;
public String getCurrentState() {
return currentState;
@@ -129,4 +145,76 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
this.jobCounters = jobCounters;
_pcs.firePropertyChange("jobCounters", null, null);
}
+
+ public int getDataLocalMaps() {
+ return dataLocalMaps;
+ }
+
+ public void setDataLocalMaps(int dataLocalMaps) {
+ this.dataLocalMaps = dataLocalMaps;
+ valueChanged("dataLocalMaps");
+ }
+
+ public double getDataLocalMapsPercentage() {
+ return dataLocalMapsPercentage;
+ }
+
+ public void setDataLocalMapsPercentage(double dataLocalMapsPercentage) {
+ this.dataLocalMapsPercentage = dataLocalMapsPercentage;
+ valueChanged("dataLocalMapsPercentage");
+ }
+
+ public int getRackLocalMaps() {
+ return rackLocalMaps;
+ }
+
+ public void setRackLocalMaps(int rackLocalMaps) {
+ this.rackLocalMaps = rackLocalMaps;
+ valueChanged("rackLocalMaps");
+ }
+
+ public double getRackLocalMapsPercentage() {
+ return rackLocalMapsPercentage;
+ }
+
+ public void setRackLocalMapsPercentage(double rackLocalMapsPercentage) {
+ this.rackLocalMapsPercentage = rackLocalMapsPercentage;
+ valueChanged("rackLocalMapsPercentage");
+ }
+
+ public int getTotalLaunchedMaps() {
+ return totalLaunchedMaps;
+ }
+
+ public void setTotalLaunchedMaps(int totalLaunchedMaps) {
+ this.totalLaunchedMaps = totalLaunchedMaps;
+ valueChanged("totalLaunchedMaps");
+ }
+
+ public long getSubmissionTime() {
+ return submissionTime;
+ }
+
+ public void setSubmissionTime(long submissionTime) {
+ this.submissionTime = submissionTime;
+ valueChanged("submissionTime");
+ }
+
+ public long getLastMapDuration() {
+ return lastMapDuration;
+ }
+
+ public void setLastMapDuration(long lastMapDuration) {
+ this.lastMapDuration = lastMapDuration;
+ valueChanged("lastMapDuration");
+ }
+
+ public long getLastReduceDuration() {
+ return lastReduceDuration;
+ }
+
+ public void setLastReduceDuration(long lastReduceDuration) {
+ this.lastReduceDuration = lastReduceDuration;
+ valueChanged("lastReduceDuration");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java
index 2400c55..9e8a372 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.mr.history.entities;
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -27,7 +27,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@Table("eaglejpa_process")
@ColumnFamily("f")
@Prefix("process")
-@Service(JPAConstants.JPA_JOB_PROCESS_TIME_STAMP_NAME)
+@Service(Constants.JPA_JOB_PROCESS_TIME_STAMP_NAME)
@TimeSeries(true)
@Partition({"site"})
public class JobProcessTimeStampEntity extends TaggedLogAPIEntity {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java
index 9769620..929a98f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.mr.history.entities;
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -26,7 +26,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@Table("eaglejpa_anomaly")
@ColumnFamily("f")
@Prefix("tacount")
-@Service(JPAConstants.JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME)
+@Service(Constants.JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME)
@TimeSeries(true)
@Partition({"site"})
public class TaskAttemptCounterAPIEntity extends JobBaseAPIEntity {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java
index 77994a5..abc28e2 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java
@@ -18,8 +18,8 @@
package org.apache.eagle.jpm.mr.history.entities;
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -27,7 +27,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@Table("eaglejpa_task")
@ColumnFamily("f")
@Prefix("taexec")
-@Service(JPAConstants.JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME)
+@Service(Constants.JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME)
@TimeSeries(true)
@Partition({"site"})
@Indexes({
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java
index f287688..c1f71b8 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java
@@ -18,8 +18,8 @@
package org.apache.eagle.jpm.mr.history.entities;
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -27,7 +27,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@Table("eaglejpa_task")
@ColumnFamily("f")
@Prefix("texec")
-@Service(JPAConstants.JPA_TASK_EXECUTION_SERVICE_NAME)
+@Service(Constants.JPA_TASK_EXECUTION_SERVICE_NAME)
@TimeSeries(true)
@Partition({"site"})
public class TaskExecutionAPIEntity extends JobBaseAPIEntity {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java
index 5ae67c0..7456522 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.mr.history.entities;
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.entity.meta.*;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -26,7 +26,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@Table("eaglejpa_anomaly")
@ColumnFamily("f")
@Prefix("taskfailurecount")
-@Service(JPAConstants.JPA_TASK_FAILURE_COUNT_SERVICE_NAME)
+@Service(Constants.JPA_TASK_FAILURE_COUNT_SERVICE_NAME)
@TimeSeries(true)
@Partition({"site"})
public class TaskFailureCountAPIEntity extends JobBaseAPIEntity {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java
deleted file mode 100644
index 1c1c759..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.jobcounter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * MR Job counter dictionary. It's singlton class that will try to read JobCounter.conf file and configure
- * counters.
- *
- */
-public final class CounterGroupDictionary {
-
- private final List<CounterGroupKey> groupKeys = new ArrayList<>();
-
- private static volatile CounterGroupDictionary instance = null;
- private static final Logger LOG = LoggerFactory.getLogger(CounterGroupDictionary.class);
-
- private CounterGroupDictionary() {}
-
- public static CounterGroupDictionary getInstance() throws JobCounterException {
- if (instance == null) {
- synchronized (CounterGroupDictionary.class) {
- if (instance == null) {
- CounterGroupDictionary tmp = new CounterGroupDictionary();
- tmp.initialize();
- instance = tmp;
- }
- }
- }
- return instance;
- }
-
- public CounterGroupKey getCounterGroupByName(String groupName) {
- for (CounterGroupKey groupKey : groupKeys) {
- if (groupKey.getName().equalsIgnoreCase(groupName)) {
- return groupKey;
- }
- }
- return null;
- }
-
- public CounterGroupKey getCounterGroupByIndex(int groupIndex) {
- if (groupIndex < 0 || groupIndex >= groupKeys.size()) {
- return null;
- }
- return groupKeys.get(groupIndex);
- }
-
- private void initialize() throws JobCounterException {
- // load config.properties file from classpath
- InputStream is = this.getClass().getClassLoader().getResourceAsStream("/JobCounter.conf");
- try {
- if (is == null) {
- is = this.getClass().getClassLoader().getResourceAsStream("JobCounter.conf");
- if (is == null) {
- final String errMsg = "Failed to load JobCounter.conf";
- LOG.error(errMsg);
- throw new JobCounterException(errMsg);
- }
- }
- final Properties prop = new Properties();
- try {
- prop.load(is);
- } catch(Exception ex) {
- final String errMsg = "Failed to load JobCounter.conf, reason: " + ex.getMessage();
- LOG.error(errMsg, ex);
- throw new JobCounterException(errMsg, ex);
- }
- int groupIndex = 0;
- while (parseGroup(groupIndex, prop)) {
- ++groupIndex;
- }
- } finally {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- }
- }
- }
- }
-
- private boolean parseGroup(int groupIndex, Properties prop) {
- final String groupKeyBase = "counter.group" + groupIndex;
- final String groupNameKey = groupKeyBase + ".name";
- final String groupName = prop.getProperty(groupNameKey);
-
- if (groupName == null) {
- return false;
- }
-
- final String groupDescriptionKey = groupKeyBase + ".description";
- final String groupDescription = prop.getProperty(groupDescriptionKey);
- final CounterGroupKeyImpl groupKey = new CounterGroupKeyImpl(groupIndex, groupName, groupDescription);
- final ArrayList<CounterKey> counters = new ArrayList<CounterKey>();
-
- int counterIndex = 0;
- while (parseCounter(groupKey, counterIndex, counters, prop)) {
- ++counterIndex;
- }
- groupKey.setCounterKeys(counters.toArray(new CounterKey[counters.size()]));
- groupKeys.add(groupKey);
- return true;
- }
-
- private boolean parseCounter(CounterGroupKey groupKey, int counterIndex, List<CounterKey> counters, Properties prop) {
- final String counterKeyBase = "counter.group" + groupKey.getIndex() + ".counter" + counterIndex;
- final String counterNameKey = counterKeyBase + ".names";
- final String counterNamesString = prop.getProperty(counterNameKey);
-
- if (counterNamesString == null) {
- return false;
- }
- final String[] names = counterNamesString.split(",");
- final List<String> counterNames = new ArrayList<String>();
- for (String name : names) {
- counterNames.add(name.trim());
- }
-
- final String counterDescriptionKey = counterKeyBase + ".description";
- final String counterDescription = prop.getProperty(counterDescriptionKey);
-
- CounterKey counter = new CounterKeyImpl(counterIndex, counterNames, counterDescription, groupKey);
- counters.add(counter);
- return true;
- }
-
- private static class CounterKeyImpl implements CounterKey {
- private final int index;
- private final List<String> counterNames;
- private final String description;
- private final CounterGroupKey groupKey;
-
- public CounterKeyImpl(int index, List<String> counterNames, String description, CounterGroupKey groupKey) {
- this.index = index;
- this.counterNames = counterNames;
- this.description = description;
- this.groupKey = groupKey;
- }
- @Override
- public int getIndex() {
- return index;
- }
- @Override
- public List<String> getNames() {
- return counterNames;
- }
- @Override
- public String getDescription() {
- return description;
- }
- @Override
- public CounterGroupKey getGroupKey() {
- return groupKey;
- }
- }
-
- private static class CounterGroupKeyImpl implements CounterGroupKey {
- private final int index;
- private final String name;
- private final String description;
- private CounterKey[] counterKeys;
-
- public CounterGroupKeyImpl(int index, String name, String description) {
- this.index = index;
- this.name = name;
- this.description = description;
- }
-
- public void setCounterKeys(CounterKey[] counterKeys) {
- this.counterKeys = counterKeys;
- }
-
- @Override
- public int getIndex() {
- return index;
- }
- @Override
- public String getName() {
- return name;
- }
- @Override
- public String getDescription() {
- return description;
- }
- @Override
- public int getCounterNumber() {
- return counterKeys.length;
- }
- @Override
- public List<CounterKey> listCounterKeys() {
- return Arrays.asList(counterKeys);
- }
- @Override
- public CounterKey getCounterKeyByName(String name) {
- for (CounterKey counterKey : counterKeys) {
- for (String n : counterKey.getNames()) {
- if (n.equalsIgnoreCase(name)) {
- return counterKey;
- }
- }
- }
- return null;
- }
- @Override
- public CounterKey getCounterKeyByID(int index) {
- if (index < 0 || index >= counterKeys.length) {
- return null;
- }
- return counterKeys[index];
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java
deleted file mode 100644
index 82606d1..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.jobcounter;
-
-import java.util.List;
-
-public interface CounterGroupKey {
-
- String getName();
- String getDescription();
- int getIndex();
- int getCounterNumber();
- List<CounterKey> listCounterKeys();
- CounterKey getCounterKeyByName(String name);
- CounterKey getCounterKeyByID(int index);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java
deleted file mode 100644
index 161490f..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.jobcounter;
-
-import java.util.List;
-
-public interface CounterKey {
-
- List<String> getNames();
- String getDescription();
- int getIndex();
- CounterGroupKey getGroupKey();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java
deleted file mode 100644
index 5ffaf51..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.jobcounter;
-
-public class JobCounterException extends Exception {
-
- /**
- *
- */
- private static final long serialVersionUID = -4525162176188266862L;
-
- /**
- * Default constructor of JobCounterException
- */
- public JobCounterException() {
- super();
- }
-
- /**
- * Constructor of JobCounterException
- *
- * @param message error message
- */
- public JobCounterException(String message) {
- super(message);
- }
-
- /**
- * Constructor of JobCounterException
- *
- * @param message error message
- * @param cause the cause of the exception
- *
- */
- public JobCounterException(String message, Throwable cause) {
- super(message, cause);
- }
-
- /**
- * Constructor of JobCounterException
- *
- * @param cause the cause of the exception
- */
- public JobCounterException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java
deleted file mode 100644
index 2806cf1..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.jobcounter;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-
-public final class JobCounters {
-
- private Map<String, Map<String, Long>> counters = new TreeMap<>();
-
- public Map<String, Map<String, Long>> getCounters() {
- return counters;
- }
-
- public void setCounters(Map<String, Map<String, Long>> counters) {
- this.counters = counters;
- }
-
- public String toString(){
- return counters.toString();
- }
-
- public void clear() {
- for (Map.Entry<String, Map<String, Long>> entry : counters.entrySet()) {
- entry.getValue().clear();
- }
- counters.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java
deleted file mode 100644
index 9d13fb4..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.parser;
-
-public enum EagleJobTagName {
- SITE("site"),
- RACK("rack"),
- HOSTNAME("hostname"),
- JOB_NAME("jobName"),
- NORM_JOB_NAME("normJobName"),
- JOB_ID("jobID"),
- TASK_ID("taskID"),
- TASK_ATTEMPT_ID("taskAttemptID"),
- JOB_STATUS("jobStatus"),
- USER("user"),
- TASK_TYPE("taskType"),
- TASK_EXEC_TYPE("taskExecType"),
- ERROR_CATEGORY("errorCategory"),
- JOB_QUEUE("queue"),
- RULE_TYPE("ruleType"),
- JOB_TYPE("jobType");
-
- private String tagName;
- private EagleJobTagName(String tagName) {
- this.tagName = tagName;
- }
-
- public String toString() {
-
- return this.tagName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index 31bfdb5..6442699 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -18,11 +18,13 @@
package org.apache.eagle.jpm.mr.history.parser;
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
-import org.apache.eagle.jpm.mr.history.common.JobConfig;
+import org.apache.eagle.jpm.mr.history.entities.JobConfig;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
import org.apache.eagle.jpm.mr.history.entities.*;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.JobNameNormalization;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
@@ -56,10 +58,10 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
// hostname to rack mapping
protected Map<String, String> m_host2RackMapping;
- protected String m_jobID;
+ protected String m_jobId;
protected String m_jobName;
protected String m_jobType;
- protected String m_normJobName;
+ protected String m_jobDefId;
protected String m_user;
protected String m_queueName;
protected Long m_jobLauchTime;
@@ -69,12 +71,12 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
protected final Configuration configuration;
- public JPAConstants.JobType fetchJobType(Configuration config) {
- if (config.get(JPAConstants.JobConfiguration.CASCADING_JOB) != null) { return JPAConstants.JobType.CASCADING; }
- if (config.get(JPAConstants.JobConfiguration.HIVE_JOB) != null) { return JPAConstants.JobType.HIVE; }
- if (config.get(JPAConstants.JobConfiguration.PIG_JOB) != null) { return JPAConstants.JobType.PIG; }
- if (config.get(JPAConstants.JobConfiguration.SCOOBI_JOB) != null) {return JPAConstants.JobType.SCOOBI; }
- return JPAConstants.JobType.NOTAVALIABLE;
+ public Constants.JobType fetchJobType(Configuration config) {
+ if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) { return Constants.JobType.CASCADING; }
+ if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) { return Constants.JobType.HIVE; }
+ if (config.get(Constants.JobConfiguration.PIG_JOB) != null) { return Constants.JobType.PIG; }
+ if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) {return Constants.JobType.SCOOBI; }
+ return Constants.JobType.NOTAVALIABLE;
}
/**
@@ -121,7 +123,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
public void close() throws IOException {
// check if this job history file is complete
if (m_jobExecutionEntity.getEndTime() == 0L) {
- throw new IOException(new JHFWriteNotCompletedException(m_jobID));
+ throw new IOException(new JHFWriteNotCompletedException(m_jobId));
}
try {
flush();
@@ -142,7 +144,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
* @param id
*/
private void setJobID(String id) {
- this.m_jobID = id;
+ this.m_jobId = id;
}
private void setJobType(String jobType) {
@@ -152,10 +154,10 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters) throws Exception {
String id = values.get(Keys.JOBID);
- if (m_jobID == null) {
+ if (m_jobId == null) {
setJobID(id);
- } else if (!m_jobID.equals(id)) {
- String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobID + "'";
+ } else if (!m_jobId.equals(id)) {
+ String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobId + "'";
LOG.error(msg);
throw new ImportException(msg);
}
@@ -165,51 +167,63 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
m_user = values.get(Keys.USER);
m_queueName = values.get(Keys.JOB_QUEUE);
m_jobName = values.get(Keys.JOBNAME);
- m_normJobName = m_jobName;
+ m_jobDefId = m_jobName;
- LOG.info("NormJobName of " + id + ": " + m_normJobName);
+ // If given job name then use it as norm job name, otherwise use eagle JobNameNormalization rule to generate.
+ String jobDefId = null;
+ if(configuration != null ) jobDefId = configuration.get(Constants.JOB_DEFINITION_ID_KEY);
- m_jobSubmitEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
- m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
- m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name());
- m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
- m_jobSubmitEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
- m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType);
+ if(jobDefId == null) {
+ m_jobDefId = JobNameNormalization.getInstance().normalize(m_jobName);
+ } else {
+ LOG.debug("Got normJobName from job configuration for " + id + ": " + jobDefId);
+ m_jobDefId = jobDefId;
+ }
+
+ LOG.info("NormJobName of " + id + ": " + m_jobDefId);
+
+ m_jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+ m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+ m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name());
+ m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+ m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
entityCreated(m_jobSubmitEventEntity);
} else if(values.get(Keys.LAUNCH_TIME) != null) { // job launched
m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME)));
m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp();
- m_jobLaunchEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
- m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
- m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name());
- m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
- m_jobLaunchEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
- m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType);
+ m_jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+ m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+ m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name());
+ m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+ m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+ m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS));
m_numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES));
entityCreated(m_jobLaunchEventEntity);
} else if(values.get(Keys.FINISH_TIME) != null) { // job finished
m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME)));
- m_jobFinishEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
- m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
- m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS));
- m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
- m_jobFinishEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
- m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType);
+ m_jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+ m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+ m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS));
+ m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+ m_jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+ m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
entityCreated(m_jobFinishEventEntity);
// populate jobExecutionEntity entity
- m_jobExecutionEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
- m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
- m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
- m_jobExecutionEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
- m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_QUEUE.toString(), m_queueName);
- m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), m_queueName);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS));
m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp());
m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp());
m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp());
+ m_jobExecutionEntity.setSubmissionTime(m_jobSubmitEventEntity.getTimestamp());
if (values.get(Keys.FAILED_MAPS) != null) {
// for Artemis
m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS)));
@@ -223,7 +237,27 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps);
m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces);
if (values.get(Keys.COUNTERS) != null || totalCounters != null) {
- m_jobExecutionEntity.setJobCounters(parseCounters(totalCounters));
+ JobCounters jobCounters = parseCounters(totalCounters);
+ m_jobExecutionEntity.setJobCounters(jobCounters);
+ if (jobCounters.getCounters().containsKey(Constants.JOB_COUNTER)) {
+ Map<String, Long> counters = jobCounters.getCounters().get(Constants.JOB_COUNTER);
+ if (counters.containsKey(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) {
+ m_jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue());
+ }
+
+ if (counters.containsKey(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) {
+ m_jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue());
+ }
+
+ if (counters.containsKey(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) {
+ m_jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue());
+ }
+ }
+
+ if (m_jobExecutionEntity.getTotalLaunchedMaps() > 0) {
+ m_jobExecutionEntity.setDataLocalMapsPercentage(m_jobExecutionEntity.getDataLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
+ m_jobExecutionEntity.setRackLocalMapsPercentage(m_jobExecutionEntity.getRackLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
+ }
}
entityCreated(m_jobExecutionEntity);
}
@@ -261,13 +295,13 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
final String taskID = values.get(Keys.TASKID);
Map<String, String> taskBaseTags = new HashMap<String, String>(){{
- put(EagleJobTagName.TASK_TYPE.toString(), taskType);
- put(EagleJobTagName.USER.toString(), m_user);
- //put(EagleJobTagName.JOB_NAME.toString(), _jobName);
- put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
- put(EagleJobTagName.JOB_TYPE.toString(), m_jobType);
- put(EagleJobTagName.JOB_ID.toString(), m_jobID);
- put(EagleJobTagName.TASK_ID.toString(), taskID);
+ put(MRJobTagName.TASK_TYPE.toString(), taskType);
+ put(MRJobTagName.USER.toString(), m_user);
+ //put(MRJobTagName.JOB_NAME.toString(), _jobName);
+ put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+ put(MRJobTagName.JOB_TYPE.toString(), m_jobType);
+ put(MRJobTagName.JOB_ID.toString(), m_jobId);
+ put(MRJobTagName.TASK_ID.toString(), taskID);
}};
taskBaseTags.putAll(m_baseTags);
if (recType == RecordTypes.Task && startTime != null) { // task start, no host is assigned yet
@@ -278,8 +312,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
Map<String, String> taskExecutionTags = new HashMap<>(taskBaseTags);
String hostname = m_taskRunningHosts.get(taskID);
hostname = (hostname == null) ? "" : hostname; // TODO if task fails, then no hostname
- taskExecutionTags.put(EagleJobTagName.HOSTNAME.toString(), hostname);
- taskExecutionTags.put(EagleJobTagName.RACK.toString(), m_host2RackMapping.get(hostname));
+ taskExecutionTags.put(MRJobTagName.HOSTNAME.toString(), hostname);
+ taskExecutionTags.put(MRJobTagName.RACK.toString(), m_host2RackMapping.get(hostname));
entity.setTags(taskExecutionTags);
entity.setStartTime(m_taskStartTime.get(taskID));
entity.setEndTime(Long.valueOf(finishTime));
@@ -290,6 +324,13 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
if (values.get(Keys.COUNTERS) != null || counters != null) {
entity.setJobCounters(parseCounters(counters));
}
+ long duration = entity.getEndTime() - m_jobSubmitEventEntity.getTimestamp();
+ if (taskType.equals(Constants.TaskType.MAP.toString()) && duration > m_jobExecutionEntity.getLastMapDuration()) {
+ m_jobExecutionEntity.setLastMapDuration(duration);
+ }
+ if (taskType.equals(Constants.TaskType.REDUCE.toString()) && duration > m_jobExecutionEntity.getLastReduceDuration()) {
+ m_jobExecutionEntity.setLastReduceDuration(duration);
+ }
entityCreated(entity);
//_taskStartTime.remove(taskID); // clean this taskID
} else if ((recType == RecordTypes.MapAttempt || recType == RecordTypes.ReduceAttempt) && startTime != null) { // task attempt start
@@ -300,8 +341,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
entity.setTags(taskAttemptExecutionTags);
String hostname = values.get(Keys.HOSTNAME);
String rack = values.get(Keys.RACK);
- taskAttemptExecutionTags.put(EagleJobTagName.HOSTNAME.toString(), hostname);
- taskAttemptExecutionTags.put(EagleJobTagName.RACK.toString(), rack);
+ taskAttemptExecutionTags.put(MRJobTagName.HOSTNAME.toString(), hostname);
+ taskAttemptExecutionTags.put(MRJobTagName.RACK.toString(), rack);
// put last attempt's hostname to task level
m_taskRunningHosts.put(taskID, hostname);
// it is very likely that an attempt ID could be both succeeded and failed due to M/R system
@@ -345,17 +386,17 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
if (matchMustHaveKeyPatterns(prop)) {
JobConfigurationAPIEntity jobConfigurationEntity = new JobConfigurationAPIEntity();
jobConfigurationEntity.setTags(new HashMap<>(m_baseTags));
- jobConfigurationEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
- jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
- jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
- jobConfigurationEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
- jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(), m_jobType);
+ jobConfigurationEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+ jobConfigurationEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+ jobConfigurationEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+ jobConfigurationEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+ jobConfigurationEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), m_jobType);
jobConfigurationEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp());
JobConfig jobConfig = new JobConfig();
jobConfig.setConfig(prop);
jobConfigurationEntity.setJobConfig(jobConfig);
- jobConfigurationEntity.setConfigJobName(m_normJobName);
+ jobConfigurationEntity.setConfigJobName(m_jobDefId);
entityCreated(jobConfigurationEntity);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
index 7fe8909..278deca 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
@@ -20,7 +20,7 @@ package org.apache.eagle.jpm.mr.history.parser;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
import org.apache.eagle.jpm.mr.history.entities.JobExecutionAPIEntity;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counter;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index 6fd2c18..2d960b0 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -19,15 +19,12 @@
package org.apache.eagle.jpm.mr.history.parser;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.jobhistory.*;
-import org.apache.hadoop.util.StringInterner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
index e485cc8..38ca35c 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
@@ -20,11 +20,11 @@ package org.apache.eagle.jpm.mr.history.parser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
import org.apache.eagle.jpm.mr.history.entities.JobExecutionAPIEntity;
import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,16 +83,16 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
Map<String,Long> mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result();
if (mapTaskAttemptCounter == null) mapTaskAttemptCounter = new HashMap<>();
- mapTaskAttemptCounter.put(JPAConstants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration);
- counters.put(JPAConstants.MAP_TASK_ATTEMPT_COUNTER,mapTaskAttemptCounter);
+ mapTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration);
+ counters.put(Constants.MAP_TASK_ATTEMPT_COUNTER,mapTaskAttemptCounter);
Map<String,Long> reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result();
if (reduceTaskAttemptCounter == null) reduceTaskAttemptCounter = new HashMap<>();
- reduceTaskAttemptCounter.put(JPAConstants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration);
- counters.put(JPAConstants.REDUCE_TASK_ATTEMPT_COUNTER,reduceTaskAttemptCounter);
+ reduceTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration);
+ counters.put(Constants.REDUCE_TASK_ATTEMPT_COUNTER,reduceTaskAttemptCounter);
- counters.put(JPAConstants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result());
- counters.put(JPAConstants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER,this.m_reduceFileSystemTaskCounterAgg.result());
+ counters.put(Constants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result());
+ counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER,this.m_reduceFileSystemTaskCounterAgg.result());
jobCounters.setCounters(counters);
@@ -105,18 +105,18 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
private void taskAttemptEntityCreated(TaskAttemptExecutionAPIEntity entity) {
JobCounters jobCounters = entity.getJobCounters();
- String taskType = entity.getTags().get(JPAConstants.JOB_TASK_TYPE_TAG);
+ String taskType = entity.getTags().get(Constants.JOB_TASK_TYPE_TAG);
if (taskType != null && jobCounters != null && jobCounters.getCounters() != null) {
- if (JPAConstants.TaskType.MAP.toString().equals(taskType.toUpperCase())) {
+ if (Constants.TaskType.MAP.toString().equals(taskType.toUpperCase())) {
m_mapAttemptDuration += entity.getDuration();
- this.m_mapTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.TASK_COUNTER));
- this.m_mapFileSystemCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.FILE_SYSTEM_COUNTER));
+ this.m_mapTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER));
+ this.m_mapFileSystemCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER));
return;
- } else if (JPAConstants.TaskType.REDUCE.toString().equals(taskType.toUpperCase())) {
+ } else if (Constants.TaskType.REDUCE.toString().equals(taskType.toUpperCase())) {
m_reduceAttemptDuration += entity.getDuration();
- this.m_reduceTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.TASK_COUNTER));
- this.m_reduceFileSystemTaskCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.FILE_SYSTEM_COUNTER));
+ this.m_reduceTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER));
+ this.m_reduceFileSystemTaskCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER));
return;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index 23e7072..94de068 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -22,6 +22,7 @@ import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
import org.apache.eagle.jpm.mr.history.entities.TaskAttemptCounterAPIEntity;
import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.util.MRJobTagName;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.slf4j.Logger;
@@ -74,12 +75,12 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity;
Map<String, String> tags = new HashMap<>();
- tags.put(EagleJobTagName.SITE.toString(), e.getTags().get(EagleJobTagName.SITE.toString()));
- tags.put(EagleJobTagName.NORM_JOB_NAME.toString(), e.getTags().get(EagleJobTagName.NORM_JOB_NAME.toString()));
- tags.put(EagleJobTagName.RACK.toString(), e.getTags().get(EagleJobTagName.RACK.toString()));
- tags.put(EagleJobTagName.HOSTNAME.toString(), e.getTags().get(EagleJobTagName.HOSTNAME.toString()));
- tags.put(EagleJobTagName.JOB_ID.toString(), e.getTags().get(EagleJobTagName.JOB_ID.toString()));
- tags.put(EagleJobTagName.TASK_TYPE.toString(), e.getTags().get(EagleJobTagName.TASK_TYPE.toString()));
+ tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
+ tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
+ tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString()));
+ tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString()));
+ tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString()));
+ tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString()));
CounterKey key = new CounterKey();
key.tags = tags;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 14cc882..177fdc1 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -22,6 +22,7 @@ import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity;
import org.apache.eagle.jpm.mr.history.entities.TaskFailureCountAPIEntity;
+import org.apache.eagle.jpm.util.MRJobTagName;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.slf4j.Logger;
@@ -80,17 +81,17 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
TaskFailureCountAPIEntity failureTask = new TaskFailureCountAPIEntity();
Map<String, String> tags = new HashMap<>();
failureTask.setTags(tags);
- tags.put(EagleJobTagName.SITE.toString(), e.getTags().get(EagleJobTagName.SITE.toString()));
- tags.put(EagleJobTagName.NORM_JOB_NAME.toString(), e.getTags().get(EagleJobTagName.NORM_JOB_NAME.toString()));
- tags.put(EagleJobTagName.RACK.toString(), e.getTags().get(EagleJobTagName.RACK.toString()));
- tags.put(EagleJobTagName.HOSTNAME.toString(), e.getTags().get(EagleJobTagName.HOSTNAME.toString()));
- tags.put(EagleJobTagName.JOB_ID.toString(), e.getTags().get(EagleJobTagName.JOB_ID.toString()));
- tags.put(EagleJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
- tags.put(EagleJobTagName.TASK_TYPE.toString(), e.getTags().get(EagleJobTagName.TASK_TYPE.toString()));
+ tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
+ tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
+ tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString()));
+ tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString()));
+ tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString()));
+ tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
+ tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString()));
//TODO need optimize, match and then capture the data
final String errCategory = classifier.classifyError(e.getError());
- tags.put(EagleJobTagName.ERROR_CATEGORY.toString(), errCategory);
+ tags.put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory);
failureTask.setError(e.getError());
failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in the future
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
index db62cfb..3a08c52 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
@@ -110,6 +110,9 @@ counter.group2.counter10.names = SLOTS_MILLIS_MAPS
counter.group2.counter10.description = Total vcore-seconds taken by all reduce tasks
counter.group2.counter11.names = SLOTS_MILLIS_REDUCES
counter.group2.counter11.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter12.names = RACK_LOCAL_MAPS
+counter.group2.counter12.description = Total vcore-seconds taken by all reduce tasks
+
counter.group3.name = MapTaskAttemptCounter
counter.group3.description = Reduce Task Attempt Counter Aggregation
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index 8cb1aa3..1b97271 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -62,7 +62,7 @@
"password": "secret"
}
},
-
+
"MRConfigureKeys" : [
"mapreduce.map.output.compress",
"mapreduce.map.output.compress.codec",
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml
new file mode 100644
index 0000000..3c8aa92
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml
@@ -0,0 +1,126 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>eagle-jpm-parent</artifactId>
+ <groupId>org.apache.eagle</groupId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>eagle-jpm-mr-running</artifactId>
+ <name>eagle-jpm-mr-running</name>
+ <url>http://maven.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-stream-process-api</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.wso2.orbit.com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-stream-process-base</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.wso2.orbit.com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-job-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jsoup</groupId>
+ <artifactId>jsoup</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ </exclusions>
+
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptor>src/assembly/eagle-jpm-mr-running-assembly.xml</descriptor>
+ <finalName>eagle-jpm-mr-running-${project.version}</finalName>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <tarLongFileMode>posix</tarLongFileMode>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/assembly/eagle-jpm-mr-running-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/assembly/eagle-jpm-mr-running-assembly.xml b/eagle-jpm/eagle-jpm-mr-running/src/assembly/eagle-jpm-mr-running-assembly.xml
new file mode 100644
index 0000000..66133a0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/assembly/eagle-jpm-mr-running-assembly.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>assembly</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <useProjectArtifact>false</useProjectArtifact>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ <!--includes>
+ <include>org.apache.hadoop:hadoop-common</include>
+ <include>org.apache.hadoop:hadoop-hdfs</include>
+ <include>org.apache.hadoop:hadoop-client</include>
+ <include>org.apache.hadoop:hadoop-auth</include>
+ <include>org.apache.eagle:eagle-stream-process-api</include>
+ <include>org.apache.eagle:eagle-stream-process-base</include>
+ <include>org.jsoup:jsoup</include>
+ </includes-->
+ <excludes>
+ <exclude>org.wso2.orbit.com.lmax:disruptor</exclude>
+ <exclude>asm:asm</exclude>
+ <exclude>org.apache.storm:storm-core</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.outputDirectory}/</directory>
+ <outputDirectory>/</outputDirectory>
+ <!--<includes>-->
+ <!--<include>*.conf</include>-->
+ <!--<include>*.xml</include>-->
+ <!--<include>*.properties</include>-->
+ <!--<include>*.config</include>-->
+ <!--<include>classes/META-INF/*</include>-->
+ <!--</includes>-->
+
+ <excludes>
+ <exclude>*.yaml</exclude>
+ </excludes>
+ </fileSet>
+ </fileSets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
new file mode 100644
index 0000000..fb8b805
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
+import org.apache.eagle.jpm.util.Constants;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class MRRunningJobMain {
+ public static void main(String[] args) {
+
+ try {
+ //1. trigger init conf
+ MRRunningConfigManager mrRunningConfigManager = MRRunningConfigManager.getInstance(args);
+
+ List<String> confKeyKeys = mrRunningConfigManager.getConfig().getStringList("MRConfigureKeys.jobConfigKey");
+ confKeyKeys.add(Constants.JobConfiguration.CASCADING_JOB);
+ confKeyKeys.add(Constants.JobConfiguration.HIVE_JOB);
+ confKeyKeys.add(Constants.JobConfiguration.PIG_JOB);
+ confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB);
+
+ //2. init topology
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ String topologyName = mrRunningConfigManager.getConfig().getString("envContextConfig.topologyName");
+ String spoutName = "mrRunningJobFetchSpout";
+ String boltName = "mrRunningJobParseBolt";
+ int parallelism = mrRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
+ int tasks = mrRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName);
+ if (parallelism > tasks) {
+ parallelism = tasks;
+ }
+ topologyBuilder.setSpout(
+ spoutName,
+ new MRRunningJobFetchSpout(
+ mrRunningConfigManager.getJobExtractorConfig(),
+ mrRunningConfigManager.getEndpointConfig(),
+ mrRunningConfigManager.getZkStateConfig()),
+ parallelism
+ ).setNumTasks(tasks);
+
+ parallelism = mrRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + boltName);
+ tasks = mrRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + boltName);
+ if (parallelism > tasks) {
+ parallelism = tasks;
+ }
+ topologyBuilder.setBolt(boltName,
+ new MRRunningJobParseBolt(
+ mrRunningConfigManager.getEagleServiceConfig(),
+ mrRunningConfigManager.getEndpointConfig(),
+ mrRunningConfigManager.getJobExtractorConfig(),
+ mrRunningConfigManager.getZkStateConfig(),
+ confKeyKeys),
+ parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
+
+ backtype.storm.Config config = new backtype.storm.Config();
+ config.setNumWorkers(mrRunningConfigManager.getConfig().getInt("envContextConfig.workers"));
+ config.put(Config.TOPOLOGY_DEBUG, true);
+ if (!mrRunningConfigManager.getEnv().equals("local")) {
+ //cluster mode
+ //parse conf here
+ StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology());
+ } else {
+ //local mode
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
new file mode 100644
index 0000000..05e7812
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running.config;
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.eagle.dataproc.util.ConfigOptionParser;
+
+import java.io.Serializable;
+
+public class MRRunningConfigManager implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(MRRunningConfigManager.class);
+ public String getEnv() {
+ return env;
+ }
+ private String env;
+
+ public ZKStateConfig getZkStateConfig() { return zkStateConfig; }
+ private ZKStateConfig zkStateConfig;
+
+ public EagleServiceConfig getEagleServiceConfig() {
+ return eagleServiceConfig;
+ }
+ private EagleServiceConfig eagleServiceConfig;
+
+ public JobExtractorConfig getJobExtractorConfig() {
+ return jobExtractorConfig;
+ }
+ private JobExtractorConfig jobExtractorConfig;
+
+ public EndpointConfig getEndpointConfig() {
+ return endpointConfig;
+ }
+ private EndpointConfig endpointConfig;
+
+ public static class ZKStateConfig implements Serializable {
+ public String zkQuorum;
+ public String zkRoot;
+ public int zkSessionTimeoutMs;
+ public int zkRetryTimes;
+ public int zkRetryInterval;
+ public String zkPort;
+ }
+
+ public static class EagleServiceConfig implements Serializable {
+ public String eagleServiceHost;
+ public int eagleServicePort;
+ public int readTimeoutSeconds;
+ public int maxFlushNum;
+ public String username;
+ public String password;
+ }
+
+ public static class JobExtractorConfig implements Serializable {
+ public String site;
+ public int fetchRunningJobInterval;
+ public int parseJobThreadPoolSize;
+ public int topAndBottomTaskByElapsedTime;
+ }
+
+ public static class EndpointConfig implements Serializable {
+ public String[] rmUrls;
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+ private Config config;
+
+ private static MRRunningConfigManager manager = new MRRunningConfigManager();
+
+ private MRRunningConfigManager() {
+ this.eagleServiceConfig = new EagleServiceConfig();
+ this.jobExtractorConfig = new JobExtractorConfig();
+ this.endpointConfig = new EndpointConfig();
+ this.zkStateConfig = new ZKStateConfig();
+ }
+
+ public static MRRunningConfigManager getInstance(String[] args) {
+ manager.init(args);
+ return manager;
+ }
+
+ private void init(String[] args) {
+ try {
+ LOG.info("Loading from configuration file");
+ this.config = new ConfigOptionParser().load(args);
+ } catch (Exception e) {
+ LOG.error("failed to load config");
+ }
+
+ this.env = config.getString("envContextConfig.env");
+
+ //parse eagle zk
+ this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
+ this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
+ this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
+ this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
+ this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
+ this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
+
+ // parse eagle service endpoint
+ this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+ String port = config.getString("eagleProps.eagleService.port");
+ this.eagleServiceConfig.eagleServicePort = Integer.parseInt(port);
+ this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
+ this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
+ this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
+ this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
+ //parse job extractor
+ this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
+ this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
+ this.jobExtractorConfig.parseJobThreadPoolSize = config.getInt("jobExtractorConfig.parseJobThreadPoolSize");
+ this.jobExtractorConfig.topAndBottomTaskByElapsedTime = config.getInt("jobExtractorConfig.topAndBottomTaskByElapsedTime");
+
+ //parse data source config
+ this.endpointConfig.rmUrls = config.getStringList("dataSourceConfig.rmUrls").toArray(new String[0]);
+
+ LOG.info("Successfully initialized MRRunningConfigManager");
+ LOG.info("env: " + this.env);
+ LOG.info("site: " + this.jobExtractorConfig.site);
+ LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
+ LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
+ }
+}