You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/08/24 12:23:16 UTC
[2/5] incubator-eagle git commit: [EAGLE-496] fix code style of jpm
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
index 325a92a..b263c25 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
@@ -33,12 +33,12 @@ public class HDFSUtil {
}
public static void login(Configuration kConfig) throws IOException {
- if(kConfig.get("hdfs.kerberos.principal") == null || kConfig.get("hdfs.kerberos.principal").isEmpty()){
+ if (kConfig.get("hdfs.kerberos.principal") == null || kConfig.get("hdfs.kerberos.principal").isEmpty()) {
return;
}
- kConfig.setBoolean("hadoop.security.authorization", true);
- kConfig.set("hadoop.security.authentication", "kerberos");
- UserGroupInformation.setConfiguration(kConfig);
- UserGroupInformation.loginUserFromKeytab(kConfig.get("hdfs.kerberos.principal"), kConfig.get("hdfs.keytab.file"));
- }
+ kConfig.setBoolean("hadoop.security.authorization", true);
+ kConfig.set("hadoop.security.authentication", "kerberos");
+ UserGroupInformation.setConfiguration(kConfig);
+ UserGroupInformation.loginUserFromKeytab(kConfig.get("hdfs.kerberos.principal"), kConfig.get("hdfs.keytab.file"));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
index 8080147..9804a3b 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
@@ -29,37 +29,37 @@ public class JSONUtil {
return null;
}
- public static Integer getInt(JSONObject obj, String field){
- if(obj.containsKey(field)){
- return ((Long)obj.get(field)).intValue();
+ public static Integer getInt(JSONObject obj, String field) {
+ if (obj.containsKey(field)) {
+ return ((Long) obj.get(field)).intValue();
}
return null;
}
- public static Long getLong(JSONObject obj, String field){
- if(obj.containsKey(field)){
- return (Long)obj.get(field);
+ public static Long getLong(JSONObject obj, String field) {
+ if (obj.containsKey(field)) {
+ return (Long) obj.get(field);
}
return null;
}
- public static Boolean getBoolean(JSONObject obj, String field){
- if(obj.containsKey(field)){
- return (Boolean)obj.get(field);
+ public static Boolean getBoolean(JSONObject obj, String field) {
+ if (obj.containsKey(field)) {
+ return (Boolean) obj.get(field);
}
return null;
}
- public static JSONObject getJSONObject(JSONObject obj, String field){
- if(obj.containsKey(field)){
- return (JSONObject)obj.get(field);
+ public static JSONObject getJSONObject(JSONObject obj, String field) {
+ if (obj.containsKey(field)) {
+ return (JSONObject) obj.get(field);
}
return null;
}
- public static JSONArray getJSONArray(JSONObject obj, String field){
- if(obj.containsKey(field)){
- return (JSONArray)obj.get(field);
+ public static JSONArray getJSONArray(JSONObject obj, String field) {
+ if (obj.containsKey(field)) {
+ return (JSONArray) obj.get(field);
}
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
index c5cc82f..4e67f89 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
@@ -28,91 +28,95 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class JobNameNormalization {
- private static Logger logger = LoggerFactory.getLogger(JobNameNormalization.class);
- private static JobNameNormalization instance = new JobNameNormalization();
- private static final String JOB_NAME_NORMALIZATION_RULES_KEY = "job.name.normalization.rules.key";
- private static final String PARAMETERIZED_PREFIX = "\\$";
- private static final String MULTIPLE_RULE_DILIMITER = ";";
- /**
- * map from source string to target string
- * source string is regular expression, for example ^(.*)[0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}$
- * target string is parameterized string, for example $1, $2
- */
- private List<JobNameNormalizationRule> _rules = new ArrayList<JobNameNormalizationRule>();
-
- private enum NormalizationOp{
- REPLACE("=>");
- private String value;
- private NormalizationOp(String value){
- this.value = value;
- }
- public String toString(){
- return value;
- }
- }
-
- static class JobNameNormalizationRule{
- Pattern pattern;
- NormalizationOp op;
- String target;
- }
-
- private JobNameNormalization(){
- try{
- // load normalization rules
- Config conf = ConfigFactory.load();
- String key = JOB_NAME_NORMALIZATION_RULES_KEY.toLowerCase();
- String value = conf.getString(key);
- if(value == null){
- logger.info("no job name normalization rules are loaded");
- return;
- }
- // multiple rules are concatenated with semicolon, i.e. ;
- String rules[] = value.split(MULTIPLE_RULE_DILIMITER);
- for(String rule : rules){
- rule = rule.trim();
- logger.info("jobNormalizationRule is loaded " + rule);
- addRule(rule);
- }
- }catch(Exception ex){
- logger.error("fail loading job name normalization rules", ex);
- throw new RuntimeException(ex);
- }
- }
-
- public static JobNameNormalization getInstance(){
- return instance;
- }
-
- private void addRule(String rule){
- for(NormalizationOp op : NormalizationOp.values()){
- // split the rule to be source and target string
- String elements[] = rule.split(op.toString());
- if(elements == null || elements.length != 2) return;
- JobNameNormalizationRule r = new JobNameNormalizationRule();
- r.pattern = Pattern.compile(elements[0].trim());
- r.op = op;
- r.target = elements[1].trim();
- _rules.add(r);
- break; //once one Op is matched, exit
- }
-
- }
-
- public String normalize(String jobName){
- String normalizedJobName = jobName;
- // go through each rules and do actions
- for(JobNameNormalizationRule rule : _rules){
- Pattern p = rule.pattern;
- Matcher m = p.matcher(jobName);
- if(m.find()){
- normalizedJobName = rule.target;
- int c = m.groupCount();
- for(int i=1; i<c+1; i++){
- normalizedJobName = normalizedJobName.replaceAll(PARAMETERIZED_PREFIX+String.valueOf(i), m.group(i));
- }
- }
- }
- return normalizedJobName;
- }
+ private static Logger LOG = LoggerFactory.getLogger(JobNameNormalization.class);
+ private static JobNameNormalization instance = new JobNameNormalization();
+ private static final String JOB_NAME_NORMALIZATION_RULES_KEY = "job.name.normalization.rules.key";
+ private static final String PARAMETERIZED_PREFIX = "\\$";
+ private static final String MULTIPLE_RULE_DILIMITER = ";";
+ /**
+ * map from source string to target string.
+ * source string is regular expression, for example ^(.*)[0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}$
+ * target string is parameterized string, for example $1, $2
+ */
+ private List<JobNameNormalizationRule> _rules = new ArrayList<JobNameNormalizationRule>();
+
+ private enum NormalizationOp {
+ REPLACE("=>");
+ private String value;
+
+ private NormalizationOp(String value) {
+ this.value = value;
+ }
+
+ public String toString() {
+ return value;
+ }
+ }
+
+ static class JobNameNormalizationRule {
+ Pattern pattern;
+ NormalizationOp op;
+ String target;
+ }
+
+ private JobNameNormalization() {
+ try {
+ // load normalization rules
+ Config conf = ConfigFactory.load();
+ String key = JOB_NAME_NORMALIZATION_RULES_KEY.toLowerCase();
+ String value = conf.getString(key);
+ if (value == null) {
+ LOG.info("no job name normalization rules are loaded");
+ return;
+ }
+ // multiple rules are concatenated with semicolon, i.e. ;
+ String[] rules = value.split(MULTIPLE_RULE_DILIMITER);
+ for (String rule : rules) {
+ rule = rule.trim();
+ LOG.info("jobNormalizationRule is loaded " + rule);
+ addRule(rule);
+ }
+ } catch (Exception ex) {
+ LOG.error("fail loading job name normalization rules", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public static JobNameNormalization getInstance() {
+ return instance;
+ }
+
+ private void addRule(String rule) {
+ for (NormalizationOp op : NormalizationOp.values()) {
+ // split the rule to be source and target string
+ String[] elements = rule.split(op.toString());
+ if (elements == null || elements.length != 2) {
+ return;
+ }
+ JobNameNormalizationRule r = new JobNameNormalizationRule();
+ r.pattern = Pattern.compile(elements[0].trim());
+ r.op = op;
+ r.target = elements[1].trim();
+ _rules.add(r);
+ break; //once one Op is matched, exit
+ }
+
+ }
+
+ public String normalize(String jobName) {
+ String normalizedJobName = jobName;
+ // go through each rules and do actions
+ for (JobNameNormalizationRule rule : _rules) {
+ Pattern p = rule.pattern;
+ Matcher m = p.matcher(jobName);
+ if (m.find()) {
+ normalizedJobName = rule.target;
+ int c = m.groupCount();
+ for (int i = 1; i < c + 1; i++) {
+ normalizedJobName = normalizedJobName.replaceAll(PARAMETERIZED_PREFIX + String.valueOf(i), m.group(i));
+ }
+ }
+ }
+ return normalizedJobName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
index ea8e4f4..9811772 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
@@ -36,11 +36,12 @@ public enum MRJobTagName {
RULE_TYPE("ruleType"),
JOB_TYPE("jobType");
- private String tagName;
+ private String tagName;
+
private MRJobTagName(String tagName) {
this.tagName = tagName;
}
-
+
public String toString() {
return this.tagName;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
index 35014b1..2098747 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
@@ -19,11 +19,11 @@ package org.apache.eagle.jpm.util;
public class SparkEntityConstant {
- public enum SPARK_STAGE_STATUS{
+ public enum SparkStageStatus {
COMPLETE, FAILED
}
- public enum SPARK_JOB_STATUS{
+ public enum SparkJobStatus {
SUCCEEDED, FAILED
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
index 1d38eea..f403332 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
@@ -34,11 +34,12 @@ public enum SparkJobTagName {
private String tagName;
- private SparkJobTagName(String tagName){
+
+ private SparkJobTagName(String tagName) {
this.tagName = tagName;
}
- public String toString(){
+ public String toString() {
return this.tagName;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
index 12eb1b5..2696269 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -54,7 +54,7 @@ public class Utils {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSSzzz");
Date parsedDate = dateFormat.parse(date);
timestamp = parsedDate.getTime();
- } catch(ParseException e) {
+ } catch (ParseException e) {
e.printStackTrace();
}
@@ -68,21 +68,21 @@ public class Utils {
public static long parseMemory(String memory) {
if (memory.endsWith("g") || memory.endsWith("G")) {
int executorGB = Integer.parseInt(memory.substring(0, memory.length() - 1));
- return 1024l * 1024 * 1024 * executorGB;
+ return 1024L * 1024 * 1024 * executorGB;
} else if (memory.endsWith("m") || memory.endsWith("M")) {
int executorMB = Integer.parseInt(memory.substring(0, memory.length() - 1));
- return 1024l * 1024 * executorMB;
+ return 1024L * 1024 * executorMB;
} else if (memory.endsWith("k") || memory.endsWith("K")) {
int executorKB = Integer.parseInt(memory.substring(0, memory.length() - 1));
- return 1024l * executorKB;
+ return 1024L * executorKB;
} else if (memory.endsWith("t") || memory.endsWith("T")) {
int executorTB = Integer.parseInt(memory.substring(0, memory.length() - 1));
- return 1024l * 1024 * 1024 * 1024 * executorTB;
+ return 1024L * 1024 * 1024 * 1024 * executorTB;
} else if (memory.endsWith("p") || memory.endsWith("P")) {
int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1));
- return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB;
+ return 1024L * 1024 * 1024 * 1024 * 1024 * executorPB;
}
- LOG.info("Cannot parse memory info " + memory);
- return 0l;
+ LOG.info("Cannot parse memory info " + memory);
+ return 0L;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java
index c8572e9..5a29be2 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java
@@ -86,7 +86,7 @@ public final class CounterGroupDictionary {
final Properties prop = new Properties();
try {
prop.load(is);
- } catch(Exception ex) {
+ } catch (Exception ex) {
final String errMsg = "Failed to load JobCounter.conf, reason: " + ex.getMessage();
LOG.error(errMsg, ex);
throw new JobCounterException(errMsg, ex);
@@ -100,6 +100,7 @@ public final class CounterGroupDictionary {
try {
is.close();
} catch (IOException e) {
+ LOG.warn("{}", e);
}
}
}
@@ -162,18 +163,22 @@ public final class CounterGroupDictionary {
this.description = description;
this.groupKey = groupKey;
}
+
@Override
public int getIndex() {
return index;
}
+
@Override
public List<String> getNames() {
return counterNames;
}
+
@Override
public String getDescription() {
return description;
}
+
@Override
public CounterGroupKey getGroupKey() {
return groupKey;
@@ -200,22 +205,27 @@ public final class CounterGroupDictionary {
public int getIndex() {
return index;
}
+
@Override
public String getName() {
return name;
}
+
@Override
public String getDescription() {
return description;
}
+
@Override
public int getCounterNumber() {
return counterKeys.length;
}
+
@Override
public List<CounterKey> listCounterKeys() {
return Arrays.asList(counterKeys);
}
+
@Override
public CounterKey getCounterKeyByName(String name) {
for (CounterKey counterKey : counterKeys) {
@@ -227,6 +237,7 @@ public final class CounterGroupDictionary {
}
return null;
}
+
@Override
public CounterKey getCounterKeyByID(int index) {
if (index < 0 || index >= counterKeys.length) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java
index 482623a..54cc82f 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java
@@ -21,12 +21,17 @@ package org.apache.eagle.jpm.util.jobcounter;
import java.util.List;
public interface CounterGroupKey {
-
String getName();
+
String getDescription();
+
int getIndex();
+
int getCounterNumber();
+
List<CounterKey> listCounterKeys();
+
CounterKey getCounterKeyByName(String name);
+
CounterKey getCounterKeyByID(int index);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java
index 8e4e519..7e8be35 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java
@@ -21,10 +21,11 @@ package org.apache.eagle.jpm.util.jobcounter;
import java.util.List;
public interface CounterKey {
-
List<String> getNames();
+
String getDescription();
+
int getIndex();
+
CounterGroupKey getGroupKey();
-
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounterException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounterException.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounterException.java
index e60e1d4..5cab7c8 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounterException.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounterException.java
@@ -19,21 +19,17 @@
package org.apache.eagle.jpm.util.jobcounter;
public class JobCounterException extends Exception {
-
- /**
- *
- */
private static final long serialVersionUID = -4525162176188266862L;
/**
- * Default constructor of JobCounterException
+ * Default constructor of JobCounterException.
*/
public JobCounterException() {
super();
}
/**
- * Constructor of JobCounterException
+ * Constructor of JobCounterException.
*
* @param message error message
*/
@@ -42,7 +38,7 @@ public class JobCounterException extends Exception {
}
/**
- * Constructor of JobCounterException
+ * Constructor of JobCounterException.
*
* @param message error message
* @param cause the cause of the exception
@@ -53,7 +49,7 @@ public class JobCounterException extends Exception {
}
/**
- * Constructor of JobCounterException
+ * Constructor of JobCounterException.
*
* @param cause the cause of the exception
*/
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
index 5ecda8c..b4bf600 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
@@ -35,7 +35,7 @@ public final class JobCounters implements Serializable {
this.counters = counters;
}
- public String toString(){
+ public String toString() {
return counters.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCountersSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCountersSerDeser.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCountersSerDeser.java
index 89b9edd..5c485ff 100755
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCountersSerDeser.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCountersSerDeser.java
@@ -37,7 +37,7 @@ public class JobCountersSerDeser implements EntitySerDeser<JobCounters> {
return counters;
}
- final Map<String, Map<String, Long> > groupMap = counters.getCounters();
+ final Map<String, Map<String, Long>> groupMap = counters.getCounters();
int pos = 0;
final int totalGroups = Bytes.toInt(bytes, pos);
pos += 4;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
index 81e2886..2d1af2c 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
@@ -18,11 +18,11 @@
package org.apache.eagle.jpm.util.jobrecover;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -36,9 +36,9 @@ public class RunningJobManager implements Serializable {
public static final Logger LOG = LoggerFactory.getLogger(RunningJobManager.class);
private String zkRoot;
private CuratorFramework curator;
- private final static String ENTITY_TAGS_KEY = "entityTags";
- private final static String APP_INFO_KEY = "appInfo";
- private final static String ZNODE_LAST_FINISH_TIME = "lastFinishTime";
+ private static final String ENTITY_TAGS_KEY = "entityTags";
+ private static final String APP_INFO_KEY = "appInfo";
+ private static final String ZNODE_LAST_FINISH_TIME = "lastFinishTime";
private CuratorFramework newCurator(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval) throws Exception {
return CuratorFrameworkFactory.newClient(
@@ -62,6 +62,7 @@ public class RunningJobManager implements Serializable {
.forPath(this.zkRoot);
}
} catch (Exception e) {
+ LOG.warn("{}", e);
}
}
@@ -85,7 +86,6 @@ public class RunningJobManager implements Serializable {
}
JSONObject object = new JSONObject(fields);
Map<String, Map<String, String>> parseResult = parse(object);
- Map<String, String> tags = parseResult.get(ENTITY_TAGS_KEY);
Map<String, String> appInfoMap = parseResult.get(APP_INFO_KEY);
AppInfo appInfo = new AppInfo();
@@ -110,6 +110,7 @@ public class RunningJobManager implements Serializable {
appInfo.setAllocatedVCores(Integer.parseInt(appInfoMap.get("allocatedVCores")));
appInfo.setRunningContainers(Integer.parseInt(appInfoMap.get("runningContainers")));
+ Map<String, String> tags = parseResult.get(ENTITY_TAGS_KEY);
result.put(jobId, Pair.of(tags, appInfo));
}
return result;
@@ -142,7 +143,6 @@ public class RunningJobManager implements Serializable {
public boolean update(String yarnAppId, String jobId, Map<String, String> tags, AppInfo app) {
String path = this.zkRoot + "/" + yarnAppId + "/" + jobId;
//InterProcessMutex lock = new InterProcessMutex(curator, path);
- Map<String, String> fields = new HashMap<>();
Map<String, String> appInfo = new HashMap<>();
appInfo.put("id", app.getId());
appInfo.put("user", app.getUser());
@@ -165,6 +165,7 @@ public class RunningJobManager implements Serializable {
appInfo.put("allocatedVCores", app.getAllocatedVCores() + "");
appInfo.put("runningContainers", app.getRunningContainers() + "");
+ Map<String, String> fields = new HashMap<>();
fields.put(ENTITY_TAGS_KEY, (new JSONObject(tags)).toString());
fields.put(APP_INFO_KEY, (new JSONObject(appInfo)).toString());
try {
@@ -263,7 +264,7 @@ public class RunningJobManager implements Serializable {
LOG.error("failed to recover last finish time {}", e);
}
- return 0l;
+ return 0L;
}
public void updateLastFinishTime(int partitionId, Long lastFinishTime) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
index 2b49f9f..eb13c3c 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
@@ -17,21 +17,20 @@
/**
*
*/
-package org.apache.eagle.jpm.util.resourceFetch;
+package org.apache.eagle.jpm.util.resourcefetch;
import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
-import org.apache.eagle.jpm.util.resourceFetch.ha.HAURLSelector;
-import org.apache.eagle.jpm.util.resourceFetch.ha.HAURLSelectorImpl;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppsWrapper;
-import org.apache.eagle.jpm.util.resourceFetch.model.ClusterInfo;
-import org.apache.eagle.jpm.util.resourceFetch.model.ClusterInfoWrapper;
-import org.apache.eagle.jpm.util.resourceFetch.url.JobListServiceURLBuilderImpl;
-import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
-import org.apache.eagle.jpm.util.resourceFetch.url.SparkCompleteJobServiceURLBuilderImpl;
-import org.apache.eagle.jpm.util.resourceFetch.url.URLUtil;
-import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelector;
+import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelectorImpl;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppsWrapper;
+import org.apache.eagle.jpm.util.resourcefetch.model.ClusterInfo;
+import org.apache.eagle.jpm.util.resourcefetch.model.ClusterInfoWrapper;
+import org.apache.eagle.jpm.util.resourcefetch.url.JobListServiceURLBuilderImpl;
+import org.apache.eagle.jpm.util.resourcefetch.url.ServiceURLBuilder;
+import org.apache.eagle.jpm.util.resourcefetch.url.SparkCompleteJobServiceURLBuilderImpl;
+import org.apache.eagle.jpm.util.resourcefetch.url.URLUtil;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
@@ -43,135 +42,153 @@ import java.util.List;
public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
- private static final Logger LOG = LoggerFactory.getLogger(RMResourceFetcher.class);
- private final HAURLSelector selector;
- private final ServiceURLBuilder jobListServiceURLBuilder;
- private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder;
- private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
-
- static {
- OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
- }
-
- public RMResourceFetcher(String[] RMBasePaths) {
- this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl();
- this.sparkCompleteJobServiceURLBuilder = new SparkCompleteJobServiceURLBuilderImpl();
-
- this.selector = new HAURLSelectorImpl(RMBasePaths, jobListServiceURLBuilder, Constants.CompressionType.GZIP);
- }
-
- private void checkUrl() throws IOException {
- if (!selector.checkUrl(jobListServiceURLBuilder.build(selector.getSelectedUrl(), Constants.JobState.RUNNING.name()))) {
- selector.reSelectUrl();
- }
- }
-
- private List<AppInfo> doFetchFinishApplicationsList(String urlString, Constants.CompressionType compressionType) throws Exception {
- List<AppInfo> result;
- InputStream is = null;
- try {
- checkUrl();
- LOG.info("Going to call yarn api to fetch finished application list: " + urlString);
- is = InputStreamUtils.getInputStream(urlString, null, compressionType);
- final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
- if (appWrapper != null && appWrapper.getApps() != null
- && appWrapper.getApps().getApp() != null) {
- result = appWrapper.getApps().getApp();
- return result;
- }
- return null;
- } finally {
- if (is != null) { try { is.close();} catch (Exception e) { } }
- }
- }
-
- private String getSparkRunningJobURL() {
- StringBuilder sb = new StringBuilder();
- sb.append(selector.getSelectedUrl()).append("/").append(Constants.V2_APPS_URL);
- sb.append("?applicationTypes=SPARK&state=RUNNING&");
- sb.append(Constants.ANONYMOUS_PARAMETER);
- return sb.toString();
- }
-
- private String getMRRunningJobURL() {
- return String.format("%s/%s?applicationTypes=MAPREDUCE&state=RUNNING&%s",
- selector.getSelectedUrl(),
- Constants.V2_APPS_URL,
- Constants.ANONYMOUS_PARAMETER);
- }
-
- public String getMRFinishedJobURL(String lastFinishedTime) {
- String url = URLUtil.removeTrailingSlash(selector.getSelectedUrl());
- StringBuilder sb = new StringBuilder();
- sb.append(url).append("/").append(Constants.V2_APPS_URL);
- sb.append("?applicationTypes=MAPREDUCE&state=FINISHED&finishedTimeBegin=");
- sb.append(lastFinishedTime).append("&").append(Constants.ANONYMOUS_PARAMETER);
-
- return sb.toString();
- }
-
- private List<AppInfo> doFetchRunningApplicationsList(String urlString, Constants.CompressionType compressionType) throws Exception {
- List<AppInfo> result;
- InputStream is = null;
- try {
- checkUrl();
- LOG.info("Going to call yarn api to fetch running application list: " + urlString);
- is = InputStreamUtils.getInputStream(urlString, null, compressionType);
- final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
- if (appWrapper != null && appWrapper.getApps() != null && appWrapper.getApps().getApp() != null) {
- result = appWrapper.getApps().getApp();
- return result;
- }
- return null;
- } finally {
- if (is != null) { try { is.close();} catch (Exception e) { } }
- }
- }
-
- private List<AppInfo> getResource(Constants.ResourceType resoureType, Constants.CompressionType compressionType, Object... parameter) throws Exception {
- switch (resoureType) {
- case COMPLETE_SPARK_JOB:
- final String urlString = sparkCompleteJobServiceURLBuilder.build((String) parameter[0]);
- return doFetchFinishApplicationsList(urlString, compressionType);
- case RUNNING_SPARK_JOB:
- return doFetchRunningApplicationsList(getSparkRunningJobURL(), compressionType);
- case RUNNING_MR_JOB:
- return doFetchRunningApplicationsList(getMRRunningJobURL(), compressionType);
- case COMPLETE_MR_JOB:
- return doFetchFinishApplicationsList(getMRFinishedJobURL((String) parameter[0]), compressionType);
- default:
- throw new Exception("Not support resourceType :" + resoureType);
- }
- }
-
- public List<AppInfo> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception {
- try {
- return getResource(resoureType, Constants.CompressionType.GZIP, parameter);
- } catch (java.util.zip.ZipException ex) {
- return getResource(resoureType, Constants.CompressionType.NONE, parameter);
- }
- }
-
- private String getClusterInfoURL() {
- StringBuilder sb = new StringBuilder();
- sb.append(selector.getSelectedUrl()).append("/").append(Constants.YARN_API_CLUSTER_INFO).append("?" + Constants.ANONYMOUS_PARAMETER);
- return sb.toString();
- }
-
- public ClusterInfo getClusterInfo() throws Exception {
- InputStream is = null;
- try {
- checkUrl();
- final String urlString = getClusterInfoURL();
- LOG.info("Calling yarn api to fetch cluster info: " + urlString);
- is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP);
- final ClusterInfoWrapper clusterInfoWrapper = OBJ_MAPPER.readValue(is, ClusterInfoWrapper.class);
- if (clusterInfoWrapper != null && clusterInfoWrapper.getClusterInfo() != null) {
- return clusterInfoWrapper.getClusterInfo();
- }
- return null;
- } finally {
- if (is != null) { try { is.close();} catch (Exception e) { } }
- }
- }
+ private static final Logger LOG = LoggerFactory.getLogger(RMResourceFetcher.class);
+ private final HAURLSelector selector;
+ private final ServiceURLBuilder jobListServiceURLBuilder;
+ private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder;
+ private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+ static {
+ OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ }
+
+ public RMResourceFetcher(String[] rmBasePaths) {
+ this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl();
+ this.sparkCompleteJobServiceURLBuilder = new SparkCompleteJobServiceURLBuilderImpl();
+
+ this.selector = new HAURLSelectorImpl(rmBasePaths, jobListServiceURLBuilder, Constants.CompressionType.GZIP);
+ }
+
+ private void checkUrl() throws IOException {
+ if (!selector.checkUrl(jobListServiceURLBuilder.build(selector.getSelectedUrl(), Constants.JobState.RUNNING.name()))) {
+ selector.reSelectUrl();
+ }
+ }
+
+ private List<AppInfo> doFetchFinishApplicationsList(String urlString, Constants.CompressionType compressionType) throws Exception {
+ List<AppInfo> result;
+ InputStream is = null;
+ try {
+ checkUrl();
+ LOG.info("Going to call yarn api to fetch finished application list: " + urlString);
+ is = InputStreamUtils.getInputStream(urlString, null, compressionType);
+ final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
+ if (appWrapper != null && appWrapper.getApps() != null
+ && appWrapper.getApps().getApp() != null) {
+ result = appWrapper.getApps().getApp();
+ return result;
+ }
+ return null;
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
+ }
+ }
+ }
+
+ private String getSparkRunningJobURL() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(selector.getSelectedUrl()).append("/").append(Constants.V2_APPS_URL);
+ sb.append("?applicationTypes=SPARK&state=RUNNING&");
+ sb.append(Constants.ANONYMOUS_PARAMETER);
+ return sb.toString();
+ }
+
+ private String getMRRunningJobURL() {
+ return String.format("%s/%s?applicationTypes=MAPREDUCE&state=RUNNING&%s",
+ selector.getSelectedUrl(),
+ Constants.V2_APPS_URL,
+ Constants.ANONYMOUS_PARAMETER);
+ }
+
+ public String getMRFinishedJobURL(String lastFinishedTime) {
+ String url = URLUtil.removeTrailingSlash(selector.getSelectedUrl());
+ StringBuilder sb = new StringBuilder();
+ sb.append(url).append("/").append(Constants.V2_APPS_URL);
+ sb.append("?applicationTypes=MAPREDUCE&state=FINISHED&finishedTimeBegin=");
+ sb.append(lastFinishedTime).append("&").append(Constants.ANONYMOUS_PARAMETER);
+
+ return sb.toString();
+ }
+
+ private List<AppInfo> doFetchRunningApplicationsList(String urlString, Constants.CompressionType compressionType) throws Exception {
+ List<AppInfo> result;
+ InputStream is = null;
+ try {
+ checkUrl();
+ LOG.info("Going to call yarn api to fetch running application list: " + urlString);
+ is = InputStreamUtils.getInputStream(urlString, null, compressionType);
+ final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
+ if (appWrapper != null && appWrapper.getApps() != null && appWrapper.getApps().getApp() != null) {
+ result = appWrapper.getApps().getApp();
+ return result;
+ }
+ return null;
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
+ }
+ }
+ }
+
+ private List<AppInfo> getResource(Constants.ResourceType resoureType, Constants.CompressionType compressionType, Object... parameter) throws Exception {
+ switch (resoureType) {
+ case COMPLETE_SPARK_JOB:
+ final String urlString = sparkCompleteJobServiceURLBuilder.build((String) parameter[0]);
+ return doFetchFinishApplicationsList(urlString, compressionType);
+ case RUNNING_SPARK_JOB:
+ return doFetchRunningApplicationsList(getSparkRunningJobURL(), compressionType);
+ case RUNNING_MR_JOB:
+ return doFetchRunningApplicationsList(getMRRunningJobURL(), compressionType);
+ case COMPLETE_MR_JOB:
+ return doFetchFinishApplicationsList(getMRFinishedJobURL((String) parameter[0]), compressionType);
+ default:
+ throw new Exception("Not support resourceType :" + resoureType);
+ }
+ }
+
+ public List<AppInfo> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception {
+ try {
+ return getResource(resoureType, Constants.CompressionType.GZIP, parameter);
+ } catch (java.util.zip.ZipException ex) {
+ return getResource(resoureType, Constants.CompressionType.NONE, parameter);
+ }
+ }
+
+ private String getClusterInfoURL() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(selector.getSelectedUrl()).append("/").append(Constants.YARN_API_CLUSTER_INFO).append("?" + Constants.ANONYMOUS_PARAMETER);
+ return sb.toString();
+ }
+
+ public ClusterInfo getClusterInfo() throws Exception {
+ InputStream is = null;
+ try {
+ checkUrl();
+ final String urlString = getClusterInfoURL();
+ LOG.info("Calling yarn api to fetch cluster info: " + urlString);
+ is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP);
+ final ClusterInfoWrapper clusterInfoWrapper = OBJ_MAPPER.readValue(is, ClusterInfoWrapper.class);
+ if (clusterInfoWrapper != null && clusterInfoWrapper.getClusterInfo() != null) {
+ return clusterInfoWrapper.getClusterInfo();
+ }
+ return null;
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
index cd475e7..f920ddb 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.jpm.util.resourceFetch;
+package org.apache.eagle.jpm.util.resourcefetch;
import org.apache.eagle.jpm.util.Constants;
@@ -22,6 +22,6 @@ import java.util.List;
public interface ResourceFetcher<T> {
//continue to refactor later
- List<T> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception;
+ List<T> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
index 689d2f3..ce2d9b8 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
@@ -17,13 +17,13 @@
*
*/
-package org.apache.eagle.jpm.util.resourceFetch;
+package org.apache.eagle.jpm.util.resourcefetch;
import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
-import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication;
-import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
-import org.apache.eagle.jpm.util.resourceFetch.url.SparkJobServiceURLBuilderImpl;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourcefetch.model.SparkApplication;
+import org.apache.eagle.jpm.util.resourcefetch.url.ServiceURLBuilder;
+import org.apache.eagle.jpm.util.resourcefetch.url.SparkJobServiceURLBuilderImpl;
import org.apache.commons.codec.binary.Base64;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
@@ -49,10 +49,11 @@ public class SparkHistoryServerResourceFetcher implements ResourceFetcher<SparkA
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
}
- public SparkHistoryServerResourceFetcher(String historyServerURL, String userName, String pwd){
+ public SparkHistoryServerResourceFetcher(String historyServerURL, String userName, String pwd) {
this.historyServerURL = historyServerURL;
this.sparkDetailJobServiceURLBuilder = new SparkJobServiceURLBuilderImpl();
- this.auth = "Basic " + new String(new Base64().encode(String.format("%s:%s", userName, pwd).getBytes()));;
+ this.auth = "Basic " + new String(new Base64().encode(String.format("%s:%s", userName, pwd).getBytes()));
+ ;
}
private List<SparkApplication> doFetchSparkApplicationDetail(String appId) throws Exception {
@@ -66,14 +67,20 @@ public class SparkHistoryServerResourceFetcher implements ResourceFetcher<SparkA
} catch (FileNotFoundException e) {
return null;
} finally {
- if (is != null) { try {is.close();} catch (Exception e) { } }
+ if (is != null) {
+ try {
+ is.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
+ }
}
}
- public List<SparkApplication> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception{
- switch(resoureType) {
+ public List<SparkApplication> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception {
+ switch (resoureType) {
case SPARK_JOB_DETAIL:
- return doFetchSparkApplicationDetail((String)parameter[0]);
+ return doFetchSparkApplicationDetail((String) parameter[0]);
default:
throw new Exception("Not support resourceType :" + resoureType);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
index 6d3fa45..e99e2aa 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.jpm.util.resourceFetch.connection;
+package org.apache.eagle.jpm.util.resourcefetch.connection;
import org.apache.eagle.jpm.util.Constants;
@@ -26,44 +26,43 @@ import java.util.zip.GZIPInputStream;
public class InputStreamUtils {
- private static final int CONNECTION_TIMEOUT = 10 * 1000;
- private static final int READ_TIMEOUT = 5 * 60 * 1000;
- private static final String GZIP_HTTP_HEADER = "Accept-Encoding";
- private static final String GZIP_COMPRESSION = "gzip";
-
- private static InputStream openGZIPInputStream(URL url, String auth, int timeout) throws IOException {
- final URLConnection connection = url.openConnection();
- connection.setConnectTimeout(CONNECTION_TIMEOUT);
- connection.setReadTimeout(timeout);
- connection.addRequestProperty(GZIP_HTTP_HEADER, GZIP_COMPRESSION);
- if (null != auth){
- connection.setRequestProperty ("Authorization", auth);
- }
- return new GZIPInputStream(connection.getInputStream());
- }
-
- private static InputStream openInputStream(URL url, String auth, int timeout) throws IOException {
- URLConnection connection = url.openConnection();
- connection.setConnectTimeout(timeout);
- if (null != auth){
- connection.setRequestProperty ("Authorization", auth);
- }
+ private static final int CONNECTION_TIMEOUT = 10 * 1000;
+ private static final int READ_TIMEOUT = 5 * 60 * 1000;
+ private static final String GZIP_HTTP_HEADER = "Accept-Encoding";
+ private static final String GZIP_COMPRESSION = "gzip";
- return connection.getInputStream();
- }
-
- public static InputStream getInputStream(String urlString, String auth, Constants.CompressionType compressionType, int timeout) throws Exception {
- final URL url = URLConnectionUtils.getUrl(urlString);
- if (compressionType.equals(Constants.CompressionType.GZIP)) {
- return openGZIPInputStream(url, auth, timeout);
- }
- else { // CompressionType.NONE
- return openInputStream(url, auth, timeout);
- }
- }
-
- public static InputStream getInputStream(String urlString, String auth, Constants.CompressionType compressionType) throws Exception {
- return getInputStream(urlString, auth, compressionType, READ_TIMEOUT);
- }
+ private static InputStream openGZIPInputStream(URL url, String auth, int timeout) throws IOException {
+ final URLConnection connection = url.openConnection();
+ connection.setConnectTimeout(CONNECTION_TIMEOUT);
+ connection.setReadTimeout(timeout);
+ connection.addRequestProperty(GZIP_HTTP_HEADER, GZIP_COMPRESSION);
+ if (null != auth) {
+ connection.setRequestProperty("Authorization", auth);
+ }
+ return new GZIPInputStream(connection.getInputStream());
+ }
+
+ private static InputStream openInputStream(URL url, String auth, int timeout) throws IOException {
+ URLConnection connection = url.openConnection();
+ connection.setConnectTimeout(timeout);
+ if (null != auth) {
+ connection.setRequestProperty("Authorization", auth);
+ }
+
+ return connection.getInputStream();
+ }
+
+ public static InputStream getInputStream(String urlString, String auth, Constants.CompressionType compressionType, int timeout) throws Exception {
+ final URL url = URLConnectionUtils.getUrl(urlString);
+ if (compressionType.equals(Constants.CompressionType.GZIP)) {
+ return openGZIPInputStream(url, auth, timeout);
+ } else { // CompressionType.NONE
+ return openInputStream(url, auth, timeout);
+ }
+ }
+
+ public static InputStream getInputStream(String urlString, String auth, Constants.CompressionType compressionType) throws Exception {
+ return getInputStream(urlString, auth, compressionType, READ_TIMEOUT);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
index 2e7b248..44d27e8 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
@@ -14,30 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.jpm.util.resourceFetch.connection;
+package org.apache.eagle.jpm.util.resourcefetch.connection;
import org.apache.eagle.jpm.util.Constants;
public class JobUtils {
-
- public static String checkAndAddLastSlash(String urlBase) {
- if (!urlBase.endsWith("/")) {
- return urlBase + "/";
- }
- return urlBase;
- }
-
- public static String getJobIDByAppID(String appID) {
- if (appID.startsWith(Constants.APPLICATION_PREFIX)) {
- return appID.replace(Constants.APPLICATION_PREFIX, Constants.JOB_PREFIX);
- }
- return null;
- }
- public static String getAppIDByJobID(String jobID) {
- if (jobID.startsWith(Constants.JOB_PREFIX)) {
- return jobID.replace(Constants.JOB_PREFIX, Constants.APPLICATION_PREFIX);
- }
- return null;
- }
+ public static String checkAndAddLastSlash(String urlBase) {
+ if (!urlBase.endsWith("/")) {
+ return urlBase + "/";
+ }
+ return urlBase;
+ }
+
+ public static String getJobIDByAppID(String appID) {
+ if (appID.startsWith(Constants.APPLICATION_PREFIX)) {
+ return appID.replace(Constants.APPLICATION_PREFIX, Constants.JOB_PREFIX);
+ }
+ return null;
+ }
+
+ public static String getAppIDByJobID(String jobID) {
+ if (jobID.startsWith(Constants.JOB_PREFIX)) {
+ return jobID.replace(Constants.JOB_PREFIX, Constants.APPLICATION_PREFIX);
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
index d340d7b..2cd60ba 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
@@ -14,12 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.jpm.util.resourceFetch.connection;
+package org.apache.eagle.jpm.util.resourcefetch.connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.*;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
@@ -27,76 +25,77 @@ import java.net.URLConnection;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
+import javax.net.ssl.*;
public final class URLConnectionUtils {
- //TODO: change some public method to private
+ //TODO: change some public method to private
private static final Logger LOG = LoggerFactory.getLogger(URLConnectionUtils.class);
-
- public static URLConnection getConnection(String url) throws Exception {
- if (url.startsWith("https://")) {
- return getHTTPSConnection(url);
- } else if (url.startsWith("http://")) {
- return getHTTPConnection(url);
- }
- throw new Exception("Invalid input argument url: " + url);
- }
- public static URLConnection getHTTPConnection(String urlString) throws Exception {
- final URL url = new URL(urlString);
- return url.openConnection();
- }
+ public static URLConnection getConnection(String url) throws Exception {
+ if (url.startsWith("https://")) {
+ return getHTTPSConnection(url);
+ } else if (url.startsWith("http://")) {
+ return getHTTPConnection(url);
+ }
+ throw new Exception("Invalid input argument url: " + url);
+ }
+
+ public static URLConnection getHTTPConnection(String urlString) throws Exception {
+ final URL url = new URL(urlString);
+ return url.openConnection();
+ }
- public static URL getUrl(String urlString) throws Exception {
- if(urlString.toLowerCase().contains("https")){
- return getHTTPSUrl(urlString);
- }else if (urlString.toLowerCase().contains("http")) {
- return getURL(urlString);
- }
- throw new Exception("Invalid input argument url: " + urlString);
- }
-
- public static URL getURL(String urlString) throws MalformedURLException {
- return new URL(urlString);
- }
-
- public static URL getHTTPSUrl(String urlString) throws MalformedURLException, NoSuchAlgorithmException, KeyManagementException {
- // Create a trust manager that does not validate certificate chains
+ public static URL getUrl(String urlString) throws Exception {
+ if (urlString.toLowerCase().contains("https")) {
+ return getHTTPSUrl(urlString);
+ } else if (urlString.toLowerCase().contains("http")) {
+ return getURL(urlString);
+ }
+ throw new Exception("Invalid input argument url: " + urlString);
+ }
+
+ public static URL getURL(String urlString) throws MalformedURLException {
+ return new URL(urlString);
+ }
+
+ public static URL getHTTPSUrl(String urlString) throws MalformedURLException, NoSuchAlgorithmException, KeyManagementException {
+ // Create a trust manager that does not validate certificate chains
final TrustManager[] trustAllCerts = new TrustManager[] {new TrustAllX509TrustManager()};
// Install the all-trusting trust manager
- final SSLContext sc = SSLContext.getInstance("SSL");
- sc.init(null, trustAllCerts, new java.security.SecureRandom());
- HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+ final SSLContext sc = SSLContext.getInstance("SSL");
+ sc.init(null, trustAllCerts, new java.security.SecureRandom());
+ HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
// Create all-trusting host name verifier
- final HostnameVerifier allHostsValid = new HostnameVerifier() {
- public boolean verify(String hostname, SSLSession session) {
- return true;
- }
+ final HostnameVerifier allHostsValid = new HostnameVerifier() {
+ public boolean verify(String hostname, SSLSession session) {
+ return true;
+ }
};
HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);
return new URL(urlString);
- }
+ }
+
+ public static URLConnection getHTTPSConnection(String urlString) throws IOException, KeyManagementException, NoSuchAlgorithmException {
+ final URL url = getHTTPSUrl(urlString);
+ return url.openConnection();
+ }
- public static URLConnection getHTTPSConnection(String urlString) throws IOException, KeyManagementException, NoSuchAlgorithmException {
- final URL url = getHTTPSUrl(urlString);
- return url.openConnection();
- }
-
- public static class TrustAllX509TrustManager implements X509TrustManager {
- @Override
- public void checkClientTrusted(
- java.security.cert.X509Certificate[] chain, String authType)
- throws CertificateException {
- }
+ public static class TrustAllX509TrustManager implements X509TrustManager {
+ @Override
+ public void checkClientTrusted(
+ java.security.cert.X509Certificate[] chain, String authType)
+ throws CertificateException {
+ }
- @Override
- public void checkServerTrusted(
- java.security.cert.X509Certificate[] chain, String authType)
- throws CertificateException {
- }
+ @Override
+ public void checkServerTrusted(
+ java.security.cert.X509Certificate[] chain, String authType)
+ throws CertificateException {
+ }
- @Override
- public java.security.cert.X509Certificate[] getAcceptedIssuers() {
- return null;
- }
- }
+ @Override
+ public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/AbstractURLSelector.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/AbstractURLSelector.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/AbstractURLSelector.java
index 57c2902..d25d05b 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/AbstractURLSelector.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/AbstractURLSelector.java
@@ -16,10 +16,10 @@
*
*/
-package org.apache.eagle.jpm.util.resourceFetch.ha;
+package org.apache.eagle.jpm.util.resourcefetch.ha;
import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +51,13 @@ public abstract class AbstractURLSelector implements HAURLSelector {
LOG.info("get input stream from url: " + urlString + " failed. ");
return false;
} finally {
- if (is != null) { try { is.close(); } catch (IOException e) {/*Do nothing*/} }
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ LOG.warn("{}", e);
+ }
+ }
}
return true;
}
@@ -66,9 +72,13 @@ public abstract class AbstractURLSelector implements HAURLSelector {
@Override
public void reSelectUrl() throws IOException {
- if (reselectInProgress) return;
- synchronized(this) {
- if (reselectInProgress) return;
+ if (reselectInProgress) {
+ return;
+ }
+ synchronized (this) {
+ if (reselectInProgress) {
+ return;
+ }
reselectInProgress = true;
try {
LOG.info("Going to reselect url");
@@ -81,16 +91,16 @@ public abstract class AbstractURLSelector implements HAURLSelector {
LOG.info("Successfully switch to new url : " + selectedUrl);
return;
}
- LOG.info("try url " + urlToCheck + "fail for " + (time+1) + " times, sleep 5 seconds before try again. ");
+ LOG.info("try url " + urlToCheck + "fail for " + (time + 1) + " times, sleep 5 seconds before try again. ");
try {
Thread.sleep(5 * 1000);
+ } catch (InterruptedException ex) {
+ LOG.warn("{}", ex);
}
- catch (InterruptedException ex) { /* Do Nothing */}
}
}
- throw new IOException("No alive url found: "+ StringUtils.join(";", Arrays.asList(this.urls)));
- }
- finally {
+ throw new IOException("No alive url found: " + StringUtils.join(";", Arrays.asList(this.urls)));
+ } finally {
reselectInProgress = false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
index 6eea7e3..fa9b52b 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
@@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.jpm.util.resourceFetch.ha;
+package org.apache.eagle.jpm.util.resourcefetch.ha;
import java.io.IOException;
public interface HAURLSelector {
-
- boolean checkUrl(String url);
-
- void reSelectUrl() throws IOException;
-
- String getSelectedUrl();
+
+ boolean checkUrl(String url);
+
+ void reSelectUrl() throws IOException;
+
+ String getSelectedUrl();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
index 4052ed0..7c188c6 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
@@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.jpm.util.resourceFetch.ha;
+package org.apache.eagle.jpm.util.resourcefetch.ha;
import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
-import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourcefetch.url.ServiceURLBuilder;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,73 +29,81 @@ import java.util.Arrays;
public class HAURLSelectorImpl implements HAURLSelector {
- private final String[] urls;
- private volatile String selectedUrl;
- private final ServiceURLBuilder builder;
-
- private volatile boolean reselectInProgress;
- private final Constants.CompressionType compressionType;
- private static final long MAX_RETRY_TIME = 2;
- private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class);
-
- public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, Constants.CompressionType compressionType) {
- this.urls = urls;
- this.compressionType = compressionType;
- this.builder = builder;
- }
-
- public boolean checkUrl(String urlString) {
- InputStream is = null;
- try {
- is = InputStreamUtils.getInputStream(urlString, null, compressionType);
- }
- catch (Exception ex) {
- LOG.info("get inputstream from url: " + urlString + " failed. ");
- return false;
- }
- finally {
- if (is != null) { try { is.close(); } catch (IOException e) {/*Do nothing*/} }
- }
- return true;
- }
+ private final String[] urls;
+ private volatile String selectedUrl;
+ private final ServiceURLBuilder builder;
- @Override
- public String getSelectedUrl() {
- if (selectedUrl == null) {
- selectedUrl = urls[0];
- }
- return selectedUrl;
- }
-
- @Override
- public void reSelectUrl() throws IOException {
- if (reselectInProgress) return;
- synchronized(this) {
- if (reselectInProgress) return;
- reselectInProgress = true;
- try {
- LOG.info("Going to reselect url");
- for (int i = 0; i < urls.length; i++) {
- String urlToCheck = urls[i];
- LOG.info("Going to try url :" + urlToCheck);
- for (int time = 0; time < MAX_RETRY_TIME; time++) {
- if (checkUrl(builder.build(urlToCheck, Constants.JobState.RUNNING.name()))) {
- selectedUrl = urls[i];
- LOG.info("Successfully switch to new url : " + selectedUrl);
- return;
- }
- LOG.info("try url " + urlToCheck + "fail for " + (time+1) + " times, sleep 5 seconds before try again. ");
- try {
- Thread.sleep(1 * 1000);
- }
- catch (InterruptedException ex) { /* Do Nothing */}
- }
- }
- throw new IOException("No alive url found: "+ StringUtils.join(";", Arrays.asList(this.urls)));
- }
- finally {
- reselectInProgress = false;
- }
- }
- }
+ private volatile boolean reselectInProgress;
+ private final Constants.CompressionType compressionType;
+ private static final long MAX_RETRY_TIME = 2;
+ private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class);
+
+ public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, Constants.CompressionType compressionType) {
+ this.urls = urls;
+ this.compressionType = compressionType;
+ this.builder = builder;
+ }
+
+ public boolean checkUrl(String urlString) {
+ InputStream is = null;
+ try {
+ is = InputStreamUtils.getInputStream(urlString, null, compressionType);
+ } catch (Exception ex) {
+ LOG.info("get inputstream from url: " + urlString + " failed. ");
+ return false;
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ LOG.warn("{}", e);
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String getSelectedUrl() {
+ if (selectedUrl == null) {
+ selectedUrl = urls[0];
+ }
+ return selectedUrl;
+ }
+
+ @Override
+ public void reSelectUrl() throws IOException {
+ if (reselectInProgress) {
+ return;
+ }
+ synchronized (this) {
+ if (reselectInProgress) {
+ return;
+ }
+ reselectInProgress = true;
+ try {
+ LOG.info("Going to reselect url");
+ for (int i = 0; i < urls.length; i++) {
+ String urlToCheck = urls[i];
+ LOG.info("Going to try url :" + urlToCheck);
+ for (int time = 0; time < MAX_RETRY_TIME; time++) {
+ if (checkUrl(builder.build(urlToCheck, Constants.JobState.RUNNING.name()))) {
+ selectedUrl = urls[i];
+ LOG.info("Successfully switch to new url : " + selectedUrl);
+ return;
+ }
+ LOG.info("try url " + urlToCheck + "fail for " + (time + 1) + " times, sleep 5 seconds before try again. ");
+ try {
+ Thread.sleep(1 * 1000);
+ } catch (InterruptedException ex) {
+ LOG.warn("{}", ex);
+ }
+ }
+ }
+ throw new IOException("No alive url found: " + StringUtils.join(";", Arrays.asList(this.urls)));
+ } finally {
+ reselectInProgress = false;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
index 5f13616..f769217 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
@@ -14,20 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.jpm.util.resourceFetch.model;
+package org.apache.eagle.jpm.util.resourcefetch.model;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.io.Serializable;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class AppInfo implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private String id;
- private String user;
+ private static final long serialVersionUID = 1L;
+
+ private String id;
+ private String user;
private String name;
private String queue;
private String state;
@@ -46,109 +46,142 @@ public class AppInfo implements Serializable {
private long allocatedMB;
private int allocatedVCores;
private int runningContainers;
-
- public String getId() {
- return id;
- }
- public void setId(String id) {
- this.id = id;
- }
- public String getUser() {
- return user;
- }
- public void setUser(String user) {
- this.user = user;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public String getQueue() {
- return queue;
- }
- public void setQueue(String queue) {
- this.queue = queue;
- }
- public String getState() {
- return state;
- }
- public void setState(String state) {
- this.state = state;
- }
- public String getFinalStatus() {
- return finalStatus;
- }
- public void setFinalStatus(String finalStatus) {
- this.finalStatus = finalStatus;
- }
- public double getProgress() {
- return progress;
- }
- public void setProgress(double progress) {
- this.progress = progress;
- }
- public String getTrackingUI() {
- return trackingUI;
- }
- public void setTrackingUI(String trackingUI) {
- this.trackingUI = trackingUI;
- }
- public String getTrackingUrl() {
- return trackingUrl;
- }
- public void setTrackingUrl(String trackingUrl) {
- this.trackingUrl = trackingUrl;
- }
- public String getDiagnostics() {
- return diagnostics;
- }
- public void setDiagnostics(String diagnostics) {
- this.diagnostics = diagnostics;
- }
- public String getClusterId() {
- return clusterId;
- }
- public void setClusterId(String clusterId) {
- this.clusterId = clusterId;
- }
- public String getApplicationType() {
- return applicationType;
- }
- public void setApplicationType(String applicationType) {
- this.applicationType = applicationType;
- }
- public long getStartedTime() {
- return startedTime;
- }
- public void setStartedTime(long startedTime) {
- this.startedTime = startedTime;
- }
- public long getFinishedTime() {
- return finishedTime;
- }
- public void setFinishedTime(long finishedTime) {
- this.finishedTime = finishedTime;
- }
- public long getElapsedTime() {
- return elapsedTime;
- }
- public void setElapsedTime(long elapsedTime) {
- this.elapsedTime = elapsedTime;
- }
- public String getAmContainerLogs() {
- return amContainerLogs;
- }
- public void setAmContainerLogs(String amContainerLogs) {
- this.amContainerLogs = amContainerLogs;
- }
- public String getAmHostHttpAddress() {
- return amHostHttpAddress;
- }
- public void setAmHostHttpAddress(String amHostHttpAddress) {
- this.amHostHttpAddress = amHostHttpAddress;
- }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ public String getFinalStatus() {
+ return finalStatus;
+ }
+
+ public void setFinalStatus(String finalStatus) {
+ this.finalStatus = finalStatus;
+ }
+
+ public double getProgress() {
+ return progress;
+ }
+
+ public void setProgress(double progress) {
+ this.progress = progress;
+ }
+
+ public String getTrackingUI() {
+ return trackingUI;
+ }
+
+ public void setTrackingUI(String trackingUI) {
+ this.trackingUI = trackingUI;
+ }
+
+ public String getTrackingUrl() {
+ return trackingUrl;
+ }
+
+ public void setTrackingUrl(String trackingUrl) {
+ this.trackingUrl = trackingUrl;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public void setDiagnostics(String diagnostics) {
+ this.diagnostics = diagnostics;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public String getApplicationType() {
+ return applicationType;
+ }
+
+ public void setApplicationType(String applicationType) {
+ this.applicationType = applicationType;
+ }
+
+ public long getStartedTime() {
+ return startedTime;
+ }
+
+ public void setStartedTime(long startedTime) {
+ this.startedTime = startedTime;
+ }
+
+ public long getFinishedTime() {
+ return finishedTime;
+ }
+
+ public void setFinishedTime(long finishedTime) {
+ this.finishedTime = finishedTime;
+ }
+
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+
+ public void setElapsedTime(long elapsedTime) {
+ this.elapsedTime = elapsedTime;
+ }
+
+ public String getAmContainerLogs() {
+ return amContainerLogs;
+ }
+
+ public void setAmContainerLogs(String amContainerLogs) {
+ this.amContainerLogs = amContainerLogs;
+ }
+
+ public String getAmHostHttpAddress() {
+ return amHostHttpAddress;
+ }
+
+ public void setAmHostHttpAddress(String amHostHttpAddress) {
+ this.amHostHttpAddress = amHostHttpAddress;
+ }
public long getAllocatedMB() {
return allocatedMB;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
index 741fa1d..22ee2b9 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
@@ -14,25 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.jpm.util.resourceFetch.model;
+package org.apache.eagle.jpm.util.resourcefetch.model;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.util.List;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class Applications {
- private List<AppInfo> app;
+ private List<AppInfo> app;
- public List<AppInfo> getApp() {
- return app;
- }
+ public List<AppInfo> getApp() {
+ return app;
+ }
+
+ public void setApp(List<AppInfo> app) {
+ this.app = app;
+ }
- public void setApp(List<AppInfo> app) {
- this.app = app;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
index 42ff4f8..63e37eb 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
@@ -14,24 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.jpm.util.resourceFetch.model;
+package org.apache.eagle.jpm.util.resourcefetch.model;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class AppsWrapper {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- private Applications apps;
+ private Applications apps;
- public Applications getApps() {
- return apps;
- }
+ public Applications getApps() {
+ return apps;
+ }
+
+ public void setApps(Applications apps) {
+ this.apps = apps;
+ }
- public void setApps(Applications apps) {
- this.apps = apps;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/ClusterInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/ClusterInfo.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/ClusterInfo.java
index 75a1e8a..6e16b7f 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/ClusterInfo.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/ClusterInfo.java
@@ -14,14 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.jpm.util.resourceFetch.model;
+package org.apache.eagle.jpm.util.resourcefetch.model;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.io.Serializable;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class ClusterInfo implements Serializable {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/ClusterInfoWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/ClusterInfoWrapper.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/ClusterInfoWrapper.java
index 301487e..4359e66 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/ClusterInfoWrapper.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/ClusterInfoWrapper.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.eagle.jpm.util.resourceFetch.model;
+package org.apache.eagle.jpm.util.resourcefetch.model;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class ClusterInfoWrapper {
private ClusterInfo clusterInfo;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/JobCounterGroup.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/JobCounterGroup.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/JobCounterGroup.java
index 102843f..189767b 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/JobCounterGroup.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/JobCounterGroup.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.eagle.jpm.util.resourceFetch.model;
+package org.apache.eagle.jpm.util.resourcefetch.model;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.util.List;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class JobCounterGroup {
private String counterGroupName;
@@ -30,12 +31,15 @@ public class JobCounterGroup {
public String getCounterGroupName() {
return counterGroupName;
}
+
public void setCounterGroupName(String counterGroupName) {
this.counterGroupName = counterGroupName;
}
+
public List<JobCounterItem> getCounter() {
return counter;
}
+
public void setCounter(List<JobCounterItem> counter) {
this.counter = counter;
}