You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:47 UTC

[40/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/DefaultRunningJobInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/DefaultRunningJobInputStreamCallback.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/DefaultRunningJobInputStreamCallback.java
new file mode 100644
index 0000000..fb77bc7
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/DefaultRunningJobInputStreamCallback.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.callback;
+
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.dataproc.core.EagleOutputCollector;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
+import org.apache.eagle.jobrunning.crawler.JobContext;
+
+public class DefaultRunningJobInputStreamCallback implements RunningJobCallback{
+	
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(DefaultRunningJobInputStreamCallback.class);
+	
+	private EagleOutputCollector eagleCollector;
+	
+	public DefaultRunningJobInputStreamCallback(EagleOutputCollector eagleCollector){
+		this.eagleCollector = eagleCollector;
+	}
+
+	@Override
+	public void onJobRunningInformation(JobContext context, ResourceType type, List<Object> objects) {
+		String jobId = context.jobId;
+		LOG.info(jobId + " information fetched , type: " + type);
+		if (type.equals(ResourceType.JOB_CONFIGURATION)) {
+			@SuppressWarnings("unchecked")
+			Map<String, String> config = (Map<String, String>) objects.get(0);
+			// the fist field is fixed as messageId
+			RunningJobMessageId messageId = new RunningJobMessageId(jobId, type, context.fetchedTime);
+			eagleCollector.collect(new ValuesArray(messageId, context.user, jobId, type, config));
+		}
+		else if (type.equals(ResourceType.JOB_RUNNING_INFO) || type.equals(ResourceType.JOB_COMPLETE_INFO)) {
+			// Here timestamp is meaningless, set to null
+			RunningJobMessageId messageId = new RunningJobMessageId(jobId, type, null);
+			eagleCollector.collect(new ValuesArray(messageId, context.user, jobId, type, objects));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobCallback.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobCallback.java
new file mode 100644
index 0000000..f36e8a5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobCallback.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.callback;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.eagle.jobrunning.crawler.JobContext;
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+/**
+ * callback when running job info is ready
+ */
+public interface RunningJobCallback extends Serializable{
+		
+	/**
+	 * this is called when running job resource is ready
+	 * @param jobContext
+	 * @param type
+	 * @param objects
+	 */
+	void onJobRunningInformation(JobContext jobContext, JobConstants.ResourceType type, List<Object> objects);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobMessageId.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobMessageId.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobMessageId.java
new file mode 100644
index 0000000..08b58f2
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobMessageId.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.callback;
+
+import com.google.common.base.Objects;
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class RunningJobMessageId {
+	public String jobID;
+	public JobConstants.ResourceType type;
+	// If type = JOB_RUNNING_INFO, timestamp = fetchedTime, otherwise timestamp is meaningless, set to null
+	public Long timestamp;
+	
+	public RunningJobMessageId(String jobID, JobConstants.ResourceType type, Long timestamp) {
+		this.jobID = jobID;
+		this.type = type;
+		this.timestamp = timestamp;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        final RunningJobMessageId other = (RunningJobMessageId) obj;
+        return Objects.equal(this.jobID, other.jobID) 
+        	  && Objects.equal(this.type, other.type)
+        	  && Objects.equal(this.timestamp, other.timestamp);
+	}
+	
+	@Override
+	public int hashCode() {
+		return Objects.hashCode(jobID.hashCode(), type.hashCode(), timestamp.hashCode());
+	}
+	
+	@Override
+	public String toString() {
+		return "jobID=" + jobID 
+			 + ", type=" + type.name() 
+			 + ", timestamp= " + timestamp;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/common/JobConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/common/JobConstants.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/common/JobConstants.java
new file mode 100644
index 0000000..6a1d7c8
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/common/JobConstants.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.common;
+
+public class JobConstants {
+	public static final String APPLICATION_PREFIX = "application";
+	public static final String JOB_PREFIX = "job";
+	public static final String V2_APPS_RUNNING_URL = "ws/v1/cluster/apps?state=RUNNING";
+	public static final String V2_APPS_COMPLETED_URL = "ws/v1/cluster/apps?state=FINISHED";	
+	public static final String V2_APPS_URL = "ws/v1/cluster/apps";
+	public static final String ANONYMOUS_PARAMETER = "anonymous=true";
+	
+	public static final String V2_PROXY_PREFIX_URL = "proxy/";
+	public static final String V2_APP_DETAIL_URL = "/ws/v1/mapreduce/jobs";
+	public static final String V2_MR_APPMASTER_PREFIX = "/ws/v1/mapreduce/jobs/";
+	public static final String V2_CONF_URL = "/conf";
+	public static final String V2_COMPLETE_APPS_URL = "ws/v1/cluster/apps/";
+	public static final String V2_MR_COUNTERS_URL = "/counters";
+	
+
+	public static final String HIVE_QUERY_STRING = "hive.query.string";
+	public static final String JOB_STATE_RUNNING = "RUNNING";
+	
+	public enum YarnApplicationType {
+		MAPREDUCE, UNKNOWN
+	}
+	
+	public enum CompressionType {
+		GZIP, NONE
+	}
+	
+	public enum ResourceType {
+		JOB_RUNNING_INFO, JOB_COMPLETE_INFO, JOB_CONFIGURATION, JOB_LIST
+	}
+	
+	public enum JobState {
+		RUNNING, COMPLETED, ALL
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
new file mode 100644
index 0000000..b17a41d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.config;
+
+import java.io.Serializable;
+
+import org.apache.eagle.job.JobPartitioner;
+
+public class RunningJobCrawlConfig implements Serializable{
+	private static final long serialVersionUID = 1L;
+	public RunningJobEndpointConfig endPointConfig;
+	public ControlConfig controlConfig;
+	public ZKStateConfig zkStateConfig;
+
+	public RunningJobCrawlConfig(RunningJobEndpointConfig endPointConfig, ControlConfig controlConfig, ZKStateConfig zkStateConfig){
+		this.endPointConfig = endPointConfig;
+		this.controlConfig = controlConfig;
+		this.zkStateConfig = zkStateConfig;
+	}	
+    
+    public static class RunningJobEndpointConfig implements Serializable{
+		private static final long serialVersionUID = 1L;
+		public String[] RMBasePaths;
+		public String HSBasePath;
+    }
+    
+    public static class ControlConfig implements Serializable{
+		private static final long serialVersionUID = 1L;
+    	public boolean jobConfigEnabled; 
+    	public boolean jobInfoEnabled;
+    	public int zkCleanupTimeInday;
+    	public int completedJobOutofDateTimeInMin;
+    	public int sizeOfJobConfigQueue;
+    	public int sizeOfJobCompletedInfoQueue;
+        public Class<? extends JobPartitioner> partitionerCls;
+        public int numTotalPartitions = 1;
+    }
+    
+	public static class ZKStateConfig implements Serializable{
+		private static final long serialVersionUID = 1L;
+		public String zkQuorum;
+        public String zkRoot;
+        public int zkSessionTimeoutMs;
+        public int zkRetryTimes;
+        public int zkRetryInterval;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterGroupKey.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterGroupKey.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterGroupKey.java
new file mode 100644
index 0000000..146fbe8
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterGroupKey.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.counter;
+
+import java.util.List;
+
+public interface CounterGroupKey {
+
+	String getName();
+	String getDescription();
+	int getIndex();
+	int getCounterNumber();
+	List<CounterKey> listCounterKeys();
+	CounterKey getCounterKeyByName(String name);
+	CounterKey getCounterKeyByID(int index);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterKey.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterKey.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterKey.java
new file mode 100644
index 0000000..a39d7b4
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterKey.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.counter;
+
+import java.util.List;
+
+public interface CounterKey {
+
+	List<String> getNames();
+	String getDescription();
+	int getIndex();
+	CounterGroupKey getGroupKey();
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/JobCounters.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/JobCounters.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/JobCounters.java
new file mode 100644
index 0000000..ba44ac6
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/JobCounters.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.counter;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+
+public final class JobCounters {
+	
+	private Map<String, Map<String, Long>> counters = new TreeMap<String, Map<String, Long>>();
+
+	public Map<String, Map<String, Long>> getCounters() {
+		return counters;
+	}
+
+	public void setCounters(Map<String, Map<String, Long>> counters) {
+		this.counters = counters;
+	}
+	
+	public String toString(){
+		return counters.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParser.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParser.java
new file mode 100644
index 0000000..d0f3ae9
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParser.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.counter.parser;
+
+import java.util.Map;
+
+import org.jsoup.nodes.Document;
+
+public interface JobCountersParser {
+	
+	Map<String, Long> parse(Document doc);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParserImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParserImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParserImpl.java
new file mode 100644
index 0000000..f098c54
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParserImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.counter.parser;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.jsoup.nodes.Document;
+import org.jsoup.nodes.Element;
+import org.jsoup.select.Elements;
+
+public class JobCountersParserImpl implements JobCountersParser {
+
+	@Override
+	public Map<String, Long> parse(Document doc) {
+		Elements elements = doc.select("a[href*=singlejobcounter]");
+		Iterator<Element> iter = elements.iterator();
+		Map<String, Long> counters = new HashMap<String, Long>();
+		while(iter.hasNext()) {
+			Element element = iter.next().parent();
+			String metricName = element.text();
+			long metricValue = Long.parseLong(element.nextElementSibling()
+								   .nextElementSibling().nextElementSibling().text()
+								    .replace(",", "").trim());
+			counters.put(metricName, metricValue);
+		}
+		return counters;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ConfigWorkTask.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ConfigWorkTask.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ConfigWorkTask.java
new file mode 100644
index 0000000..d9cf79c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ConfigWorkTask.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+import java.util.List;
+
+import org.apache.eagle.jobrunning.util.JobUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.jobrunning.callback.RunningJobCallback;
+import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
+
+public class ConfigWorkTask implements Runnable {
+	
+	public JobContext context;
+	public ResourceFetcher fetcher;
+	public RunningJobCallback callback;
+	public RunningJobCrawler crawler;	
+	
+	private static final Logger LOG = LoggerFactory.getLogger(ConfigWorkTask.class);
+
+	public ConfigWorkTask(JobContext context, ResourceFetcher fetcher, RunningJobCallback callback, RunningJobCrawler crawler) {
+		this.context = context;
+		this.fetcher = fetcher;
+		this.callback = callback;
+		this.crawler = crawler;
+	}
+	
+	public void run() {
+		runConfigCrawlerWorkhread(context);
+	}
+
+	private void runConfigCrawlerWorkhread(JobContext context) {
+		LOG.info("Going to fetch job configuration information, jobId:" + context.jobId);
+		try {
+			List<Object> objs = fetcher.getResource(ResourceType.JOB_CONFIGURATION, JobUtils.getAppIDByJobID(context.jobId));
+			callback.onJobRunningInformation(context, ResourceType.JOB_CONFIGURATION, objs);
+		}
+		catch (Exception ex) {
+	        if (ex.getMessage().contains("Server returned HTTP response code: 500")) {
+	        	LOG.warn("The server returns 500 error, it's probably caused by job ACL setting, going to skip this job");
+	        	// the job remains in processing list, thus we will not do infructuous retry next round
+	        	// TODO need remove it from processing list when job finished to avoid memory leak       
+	        }
+	        else {
+	        	LOG.warn("Got an exception when fetching job config: ", ex);
+	        	crawler.removeFromProcessingList(ResourceType.JOB_CONFIGURATION, context);
+	        }
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/JobContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/JobContext.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/JobContext.java
new file mode 100644
index 0000000..7599457
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/JobContext.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+public class JobContext {
+	public String jobId;
+	public String user;
+	public Long fetchedTime;
+	
+	public JobContext() {
+		
+	}
+	
+	public JobContext(JobContext context) {
+		this.jobId = new String(context.jobId);
+		this.user = new String(context.user);
+		this.fetchedTime = new Long(context.fetchedTime);
+	}
+	
+	public JobContext(String jobId, String user, Long fetchedTime) {
+		this.jobId = jobId;
+		this.user = user;
+		this.fetchedTime = fetchedTime;
+	}
+	
+	@Override
+	public int hashCode() {	
+		return jobId.hashCode() ;
+	}			
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof JobContext) {
+			JobContext context = (JobContext)obj;
+			if (this.jobId.equals(context.jobId)) {
+				return true;
+			}
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RMResourceFetcher.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RMResourceFetcher.java
new file mode 100644
index 0000000..7c41431
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RMResourceFetcher.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.ZipException;
+
+import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
+import org.apache.eagle.jobrunning.counter.parser.JobCountersParser;
+import org.apache.eagle.jobrunning.counter.parser.JobCountersParserImpl;
+import org.apache.eagle.jobrunning.ha.HAURLSelector;
+import org.apache.eagle.jobrunning.ha.HAURLSelectorImpl;
+import org.apache.eagle.jobrunning.job.conf.JobConfParser;
+import org.apache.eagle.jobrunning.job.conf.JobConfParserImpl;
+import org.apache.eagle.jobrunning.util.InputStreamUtils;
+import org.apache.eagle.jobrunning.util.JobUtils;
+import org.apache.eagle.jobrunning.util.URLConnectionUtils;
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.yarn.model.AppWrapper;
+import org.apache.eagle.jobrunning.yarn.model.JobCompleteWrapper;
+import org.apache.eagle.jobrunning.yarn.model.JobCountersWrapper;
+import org.apache.eagle.jobrunning.yarn.model.JobsWrapper;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.jsoup.Jsoup;
+import org.jsoup.nodes.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.jobrunning.url.JobCompleteCounterServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobCompleteDetailServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobCompletedConfigServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobCountersServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobDetailServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobListServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobRunningConfigServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobStatusServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.ServiceURLBuilder;
+import org.apache.eagle.jobrunning.yarn.model.AppInfo;
+import org.apache.eagle.jobrunning.yarn.model.AppsWrapper;
+import org.apache.eagle.jobrunning.yarn.model.JobDetailInfo;
+
+public class RMResourceFetcher implements ResourceFetcher{
+	
+	private static final Logger LOG = LoggerFactory.getLogger(RMResourceFetcher.class);
+	private final HAURLSelector selector;
+	private final String historyBaseUrl;
+	private final ServiceURLBuilder jobListServiceURLBuilder;
+	private final ServiceURLBuilder jobDetailServiceURLBuilder;
+	private final ServiceURLBuilder jobCounterServiceURLBuilder;
+	private final ServiceURLBuilder jobRunningConfigServiceURLBuilder;
+	private final ServiceURLBuilder jobCompleteDetailServiceURLBuilder;
+	private final ServiceURLBuilder jobCompleteCounterServiceURLBuilder;
+	private final ServiceURLBuilder jobCompletedConfigServiceURLBuilder;
+	private final ServiceURLBuilder jobStatusServiceURLBuilder;
+		
+	private static final int CONNECTION_TIMEOUT = 10000;
+	private static final int READ_TIMEOUT = 10000;
+	private static final String XML_HTTP_HEADER = "Accept";
+	private static final String XML_FORMAT = "application/xml";
+	
+	private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+	
+	static {
+		OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+	}
+	
+	public RMResourceFetcher(RunningJobCrawlConfig.RunningJobEndpointConfig config) {
+		this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl();
+		this.jobDetailServiceURLBuilder = new JobDetailServiceURLBuilderImpl();
+		this.jobCounterServiceURLBuilder = new JobCountersServiceURLBuilderImpl();
+		this.jobRunningConfigServiceURLBuilder = new JobRunningConfigServiceURLBuilderImpl();
+		this.jobCompleteDetailServiceURLBuilder = new JobCompleteDetailServiceURLBuilderImpl();
+		this.jobCompleteCounterServiceURLBuilder = new JobCompleteCounterServiceURLBuilderImpl();
+		this.jobCompletedConfigServiceURLBuilder = new JobCompletedConfigServiceURLBuilderImpl();
+		this.jobStatusServiceURLBuilder = new JobStatusServiceURLBuilderImpl();
+
+		this.selector = new HAURLSelectorImpl(config.RMBasePaths, jobListServiceURLBuilder, JobConstants.CompressionType.GZIP);
+		this.historyBaseUrl = config.HSBasePath;
+	}
+	
+	private void checkUrl() throws IOException {
+		if (!selector.checkUrl(jobListServiceURLBuilder.build(selector.getSelectedUrl(), JobConstants.JobState.RUNNING.name()))) {
+			selector.reSelectUrl();
+		}
+	}
+	
+	private List<Object> doFetchApplicationsList(String state) throws Exception {		
+		List<AppInfo> result = null;
+		InputStream is = null;
+		try {
+			checkUrl();
+			final String urlString = jobListServiceURLBuilder.build(selector.getSelectedUrl(), state);
+			LOG.info("Going to call yarn api to fetch running job list: " + urlString);
+			is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.GZIP);
+			final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
+			if (appWrapper != null && appWrapper.getApps() != null
+					&& appWrapper.getApps().getApp() != null) {
+				result = appWrapper.getApps().getApp();
+				return Arrays.asList((Object)result);
+			}
+			return null;
+		}
+		finally {
+			if (is != null) { try {is.close();} catch (Exception e){} }
+		}
+	}
+	
+	private List<Object> doFetchRunningJobInfo(String appID) throws Exception{
+		InputStream is = null;
+		InputStream is2 = null;
+		try {
+			final String urlString = jobDetailServiceURLBuilder.build(selector.getSelectedUrl(), appID);
+			LOG.info("Going to fetch job detail information for " + appID + " , url: " + urlString);
+			try {
+				is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.GZIP);
+			}
+			catch (ZipException ex) {
+				// Here if job already completed, it will be redirected to job history page and throw java.util.zip.ZipException
+				LOG.info(appID + " has finished, skip this job");
+				return null;
+			}
+			final JobsWrapper jobWrapper = OBJ_MAPPER.readValue(is, JobsWrapper.class);
+			JobDetailInfo jobDetail = null;
+			if (jobWrapper != null && jobWrapper.getJobs() != null && jobWrapper.getJobs().getJob() != null
+				&& jobWrapper.getJobs().getJob().size() > 0) {
+				jobDetail = jobWrapper.getJobs().getJob().get(0);
+			}
+			final String urlString2 = jobCounterServiceURLBuilder.build(selector.getSelectedUrl(), appID);
+			LOG.info("Going to fetch job counters for application " + appID + " , url: " + urlString2);
+			is2 = InputStreamUtils.getInputStream(urlString2, JobConstants.CompressionType.GZIP);
+			final JobCountersWrapper jobCounterWrapper = OBJ_MAPPER.readValue(is2,JobCountersWrapper.class);
+			
+			return Arrays.asList(jobDetail, jobCounterWrapper);
+		}
+		finally {
+			if (is != null) { try {is.close();} catch (Exception e){} }
+			if (is2 != null) { try {is2.close();} catch (Exception e){} }
+		}
+	}
+	
+	private List<Object> doFetchCompleteJobInfo(String appId) throws Exception{
+		InputStream is = null;
+		InputStream is2 = null;
+		try {
+			checkUrl();
+			String jobID = JobUtils.getJobIDByAppID(appId);
+			String urlString = jobCompleteDetailServiceURLBuilder.build(selector.getSelectedUrl(), jobID);
+			LOG.info("Going to fetch job completed information for " + jobID + " , url: " + urlString);
+			is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.GZIP);
+			final JobCompleteWrapper jobWrapper = OBJ_MAPPER.readValue(is, JobCompleteWrapper.class);
+			
+			String urlString2 = jobCompleteCounterServiceURLBuilder.build(historyBaseUrl, jobID);
+			LOG.info("Going to fetch job completed counters for " + jobID + " , url: " + urlString2);
+			is2 = InputStreamUtils.getInputStream(urlString2, JobConstants.CompressionType.NONE, (int) (2 * DateUtils.MILLIS_PER_MINUTE));
+			final Document doc = Jsoup.parse(is2, StandardCharsets.UTF_8.name(), urlString2);
+			JobCountersParser parser = new JobCountersParserImpl();
+			Map<String, Long> counters = parser.parse(doc);
+			return Arrays.asList(jobWrapper, counters);
+		}
+		finally {
+			if (is != null) { try {is.close();} catch (Exception e){}  }
+			if (is2 != null) { try {is2.close();} catch (Exception e){}  }
+		}
+	}
+	
+	private List<Object> doFetchRunningJobConfiguration(String appID) throws Exception {
+		InputStream is = null;
+		try {
+			checkUrl();
+			String jobID = JobUtils.getJobIDByAppID(appID);
+			String urlString = jobRunningConfigServiceURLBuilder.build(selector.getSelectedUrl(), jobID);
+			LOG.info("Going to fetch job completed information for " + jobID + " , url: " + urlString);
+			final URLConnection connection = URLConnectionUtils.getConnection(urlString);
+			connection.setRequestProperty(XML_HTTP_HEADER, XML_FORMAT);
+			connection.setConnectTimeout(CONNECTION_TIMEOUT);
+			connection.setReadTimeout(READ_TIMEOUT);
+			is = connection.getInputStream();
+			Map<String, String> configs = XmlHelper.getConfigs(is);
+			return Arrays.asList((Object)configs);
+		}
+		finally {
+			if (is != null) { try {is.close();} catch (Exception e){}  }
+		}
+	}
+	
+	private List<Object> doFetchCompletedJobConfiguration(String appID) throws Exception {
+		InputStream is = null;
+		try {
+			String urlString = jobCompletedConfigServiceURLBuilder.build(historyBaseUrl, JobUtils.getJobIDByAppID(appID));
+			is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.NONE);
+			final Document doc = Jsoup.parse(is, "UTF-8", urlString);
+			JobConfParser parser = new JobConfParserImpl();
+			Map<String, String> configs = parser.parse(doc);
+			return Arrays.asList((Object)configs);
+		}
+		finally {
+			if (is != null) { try {is.close();} catch (Exception e){}  }
+		}
+	}
+	
+	public boolean checkIfJobIsRunning(String appID) throws Exception{
+		InputStream is = null;
+		try {
+			checkUrl();
+			final String urlString = jobStatusServiceURLBuilder.build(selector.getSelectedUrl(), appID);
+			LOG.info("Going to call yarn api to fetch job status: " + urlString);
+			is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.GZIP);
+			final AppWrapper appWrapper = OBJ_MAPPER.readValue(is, AppWrapper.class);
+			if (appWrapper != null && appWrapper.getApp() != null) {
+				AppInfo result = appWrapper.getApp();
+				if (result.getState().equals(JobConstants.JOB_STATE_RUNNING)) {
+					return true;
+				}
+				return false;
+			}
+			else {
+				LOG.error("The status of " + appID + " is not available");
+				throw new IllegalStateException("The status of " + appID + " is not available");
+			}
+		}
+		finally {
+			if (is != null) { try {is.close();} catch (Exception e){}  }
+		}
+	}
+	
+	public List<Object> getResource(JobConstants.ResourceType resoureType, Object... parameter) throws Exception{
+		switch(resoureType) {
+			case JOB_LIST:
+				return doFetchApplicationsList((String)parameter[0]);
+			case JOB_RUNNING_INFO:
+				//parameter[0]= appId
+				return doFetchRunningJobInfo((String)parameter[0]);
+			case JOB_COMPLETE_INFO:
+				//parameter[0]= appId
+				return doFetchCompleteJobInfo((String)parameter[0]);
+			case JOB_CONFIGURATION:
+				//parameter[0]= appId
+				boolean isRunning = checkIfJobIsRunning((String)parameter[0]);
+				if (isRunning)
+					return doFetchRunningJobConfiguration((String)parameter[0]);
+				else
+					return doFetchCompletedJobConfiguration((String)parameter[0]); 
+			default:
+				throw new Exception("Not support ressourceType :" + resoureType);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ResourceFetcher.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ResourceFetcher.java
new file mode 100644
index 0000000..5a5150b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ResourceFetcher.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+import java.util.List;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public interface ResourceFetcher {
+
+	List<Object> getResource(JobConstants.ResourceType resoureType, Object... parameter) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawler.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawler.java
new file mode 100644
index 0000000..103a2ff
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
+
+public interface RunningJobCrawler {
+	
+	public void crawl() throws Exception;
+	
+	public void addIntoProcessingList(ResourceType type, JobContext context);
+	
+	public void removeFromProcessingList(ResourceType type, JobContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.java
new file mode 100644
index 0000000..72a340a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.eagle.jobrunning.zkres.JobRunningZKStateManager;
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.job.JobFilter;
+import org.apache.eagle.jobrunning.callback.RunningJobCallback;
+import org.apache.eagle.jobrunning.common.JobConstants.JobState;
+import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
+import org.apache.eagle.jobrunning.common.JobConstants.YarnApplicationType;
+import org.apache.eagle.jobrunning.util.JobUtils;
+import org.apache.eagle.jobrunning.yarn.model.AppInfo;
+import org.apache.eagle.common.DateTimeUtil;
+
+public class RunningJobCrawlerImpl implements RunningJobCrawler{
+
+	protected RunningJobCrawlConfig.RunningJobEndpointConfig endpointConfig;
+	protected RunningJobCrawlConfig.ControlConfig controlConfig;
+	protected JobFilter jobFilter;
+	private ResourceFetcher fetcher;
+	private JobRunningZKStateManager zkStateManager;
+	private Thread jobConfigProcessThread;
+	private Thread jobCompleteInfoProcessThread;
+	private Thread jobCompleteStatusCheckerThread;
+	private Thread zkCleanupThread;
+	private final RunningJobCallback callback;
+	private ReadWriteLock readWriteLock;
+	private Map<ResourceType, Map<String, JobContext>> processingJobMap = new ConcurrentHashMap<ResourceType, Map<String, JobContext>>();
+	
+	private BlockingQueue<JobContext> queueOfConfig;
+	private BlockingQueue<JobContext> queueOfCompleteJobInfo;
+	private static final int DEFAULT_CONFIG_THREAD_COUNT = 20;
+	private final long DELAY_TO_UPDATE_COMPLETION_JOB_INFO = 5 * DateUtils.MILLIS_PER_MINUTE;
+	private static final Logger LOG = LoggerFactory.getLogger(RunningJobCrawlerImpl.class);
+	
+	private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+	static {
+		OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+	}
+	
+	public RunningJobCrawlerImpl(RunningJobCrawlConfig config, JobRunningZKStateManager zkStateManager, 
+			RunningJobCallback callback, JobFilter jobFilter, ReadWriteLock readWriteLock) {
+		this.endpointConfig = config.endPointConfig;
+		this.controlConfig = config.controlConfig;
+		this.callback = callback;
+		this.fetcher = new RMResourceFetcher(endpointConfig);
+		this.jobFilter = jobFilter;
+		this.readWriteLock = readWriteLock;
+		if (config.controlConfig.jobInfoEnabled) {
+			jobCompleteInfoProcessThread = new Thread() {
+				@Override
+				public void run() {
+					startCompleteJobInfoProcessThread();
+				}
+			};
+			jobCompleteInfoProcessThread.setName("JobCompleteInfo-process-thread");
+			jobCompleteInfoProcessThread.setDaemon(true);
+			
+			jobCompleteStatusCheckerThread = new Thread() {
+				@Override
+				public void run() {
+					startCompleteStatusCheckerThread();
+				}
+			};
+			jobCompleteStatusCheckerThread.setName("JobComplete-statusChecker-thread");
+			jobCompleteStatusCheckerThread.setDaemon(true);
+		}
+
+		if (config.controlConfig.jobConfigEnabled) {
+			jobConfigProcessThread = new Thread() {
+				@Override
+				public void run() {
+					startJobConfigProcessThread();
+				}
+			};
+			jobConfigProcessThread.setName("JobConfig-process-thread");
+			jobConfigProcessThread.setDaemon(true);
+		}
+				
+		zkCleanupThread = new Thread() {
+			@Override
+			public void run() {
+				startzkCleanupThread();
+			}
+		};		
+		zkCleanupThread.setName("zk-cleanup-thread");
+		zkCleanupThread.setDaemon(true);
+		
+		this.zkStateManager = zkStateManager;
+		this.processingJobMap.put(ResourceType.JOB_CONFIGURATION, new ConcurrentHashMap<String, JobContext>());
+		this.processingJobMap.put(ResourceType.JOB_COMPLETE_INFO, new ConcurrentHashMap<String, JobContext>());
+		this.queueOfConfig = new ArrayBlockingQueue<JobContext>(controlConfig.sizeOfJobConfigQueue);
+		this.queueOfCompleteJobInfo = new ArrayBlockingQueue<JobContext>(controlConfig.sizeOfJobCompletedInfoQueue);
+	}
+	
+	private void startJobConfigProcessThread() {
+		int configThreadCount = DEFAULT_CONFIG_THREAD_COUNT;
+		LOG.info("Job Config crawler main thread started, pool size: " + DEFAULT_CONFIG_THREAD_COUNT);
+
+    	ThreadFactory factory = new ThreadFactory() {
+			private final AtomicInteger count = new AtomicInteger(0);
+
+			public Thread newThread(Runnable runnable) {
+				count.incrementAndGet();
+				Thread thread = Executors.defaultThreadFactory().newThread(runnable);
+				thread.setName("config-crawler-workthread-" + count.get());
+				return thread;
+			}
+		};
+		
+		ThreadPoolExecutor pool = new ThreadPoolExecutor(configThreadCount, configThreadCount, 0L,
+									  TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), factory);
+		
+		while (true) {
+			JobContext context;
+			try {
+				context = queueOfConfig.take();
+				LOG.info("queueOfConfig size: " + queueOfConfig.size());
+				Runnable configCrawlerThread = new ConfigWorkTask(new JobContext(context), fetcher, callback, this);
+				pool.execute(configCrawlerThread);
+			} catch (InterruptedException e) {
+				LOG.warn("Got an InterruptedException: " + e.getMessage());
+			} catch (RejectedExecutionException e2) {
+				LOG.warn("Got RejectedExecutionException: " + e2.getMessage());
+			}
+			catch (Throwable t) {
+				LOG.warn("Got an throwable t, " + t.getMessage());
+			}
+		}
+	}
+	
+	private void startCompleteJobInfoProcessThread() {
+		while(true) {
+			JobContext context = null;
+			try {
+				context = queueOfCompleteJobInfo.take();
+			} catch (InterruptedException ex) {
+			}
+			/** Delay an interval before fetch job complete info, for history url need some time to be accessible,
+			 *  The default interval is set as 5 min,
+			 *  Also need to consider if need multi thread to do this
+			 */
+			while (System.currentTimeMillis() < 
+			      context.fetchedTime + DELAY_TO_UPDATE_COMPLETION_JOB_INFO) {
+				try {
+					Thread.sleep(50);
+				} catch (InterruptedException e1) {
+				}
+			}
+			try {
+				List<Object> objs = fetcher.getResource(ResourceType.JOB_COMPLETE_INFO, JobUtils.getAppIDByJobID(context.jobId));
+				callback.onJobRunningInformation(context, ResourceType.JOB_COMPLETE_INFO, objs);
+			}
+			catch(Exception ex) {
+		        if (ex.getMessage().contains("Server returned HTTP response code: 500")) {
+		        	LOG.warn("The server returns 500 error, it's probably caused by job ACL setting, going to skip this job");
+		        	// the job remains in processing list, thus we will not do infructuous retry next round
+		        	// TODO need remove it from processing list when job finished to avoid memory leak 
+		        }
+		        else LOG.error("Got an exception when fetching resource ", ex);
+			}
+		}
+	}
+	
+	public void startCompleteStatusCheckerThread() {
+		while(true) {
+			List<Object> list;
+			try {
+				list = fetcher.getResource(ResourceType.JOB_LIST, JobState.COMPLETED.name());
+				if (list == null) {
+					LOG.warn("Current Completed Job List is Empty");
+					continue;
+				}
+				@SuppressWarnings("unchecked")
+				List<AppInfo> apps = (List<AppInfo>)list.get(0);
+				Set<JobContext> completedJobSet = new HashSet<JobContext>();
+				for (AppInfo app : apps) {
+					//Only fetch MapReduce job
+					if (!YarnApplicationType.MAPREDUCE.name().equals(app.getApplicationType()) 
+					|| !jobFilter.accept(app.getUser())) {
+						continue;
+					}
+					if (System.currentTimeMillis() - app.getFinishedTime() < controlConfig.completedJobOutofDateTimeInMin * DateUtils.MILLIS_PER_MINUTE) {
+						completedJobSet.add(new JobContext(JobUtils.getJobIDByAppID(app.getId()),app.getUser(), System.currentTimeMillis()));
+					}
+				}
+				
+				if (controlConfig.jobConfigEnabled) {
+					addIntoProcessingQueueAndList(completedJobSet, queueOfConfig, ResourceType.JOB_CONFIGURATION);
+				}
+
+				if (controlConfig.jobInfoEnabled) {
+					addIntoProcessingQueueAndList(completedJobSet, queueOfCompleteJobInfo, ResourceType.JOB_COMPLETE_INFO);
+				}
+				Thread.sleep(20 * 1000);
+			} catch (Throwable t) {
+				LOG.error("Got a throwable in fetching job completed list :", t);
+			}						
+		}
+	}
+	
+	public void startzkCleanupThread() {
+		LOG.info("zk cleanup thread started");
+		while(true) {
+			try {
+				long thresholdTime = System.currentTimeMillis() - controlConfig.zkCleanupTimeInday * DateUtils.MILLIS_PER_DAY; 
+				String date = DateTimeUtil.format(thresholdTime, "yyyyMMdd");
+				zkStateManager.truncateJobBefore(ResourceType.JOB_CONFIGURATION, date);
+				zkStateManager.truncateJobBefore(ResourceType.JOB_COMPLETE_INFO, date);
+				Thread.sleep(30 * 60 * 1000);
+			}
+			catch (Throwable t) {
+				LOG.warn("Got an throwable, t: ", t);
+			}
+		}
+	}
+	
+	public void addIntoProcessingQueueAndList(Set<JobContext> jobSet, BlockingQueue<JobContext> queue, ResourceType type) {
+		try {
+			readWriteLock.writeLock().lock();
+			LOG.info("Write lock acquired");
+			List<String> processingList = zkStateManager.readProcessedJobs(type);
+			processingList.addAll(extractJobList(type));
+			for (JobContext context: jobSet) {
+				String jobId = context.jobId;
+				if (!processingList.contains(jobId)) {
+					addIntoProcessingList(type, context);
+					queue.add(context);
+				}
+			}
+		}
+		finally {
+			try {readWriteLock.writeLock().unlock(); LOG.info("Write lock released");}
+			catch (Throwable t) { LOG.error("Fail to release Write lock", t);}
+		}
+	}
+	
+	private List<String> extractJobList(ResourceType type) {
+		Map<String, JobContext> contexts = processingJobMap.get(type);
+		return Arrays.asList(contexts.keySet().toArray(new String[0]));
+	}
+	
+	@Override
+	public void crawl() throws Exception {
+		// bring up crawler threads when crawl method is invoked first time
+		if (jobConfigProcessThread != null && !jobConfigProcessThread.isAlive()) {
+			jobConfigProcessThread.start();
+		}
+		
+		if (jobCompleteInfoProcessThread != null && !jobCompleteInfoProcessThread.isAlive()) {
+			jobCompleteInfoProcessThread.start();
+		}		
+ 
+		if (jobCompleteStatusCheckerThread != null && !jobCompleteStatusCheckerThread.isAlive()) {
+			jobCompleteStatusCheckerThread.start();
+		}
+		
+		if (!zkCleanupThread.isAlive()) {
+			zkCleanupThread.start();
+		}
+		
+		List<Object> list = fetcher.getResource(ResourceType.JOB_LIST, JobState.RUNNING.name());		
+		if (list == null) {
+			LOG.warn("Current Running Job List is Empty");
+			return;
+		}
+		
+		@SuppressWarnings("unchecked")
+		List<AppInfo> apps = (List<AppInfo>)list.get(0);
+		LOG.info("Current Running Job List size : " + apps.size());
+		Set<JobContext> currentRunningJobSet = new HashSet<JobContext>();
+		for (AppInfo app : apps) {
+			//Only fetch MapReduce job
+			if (!YarnApplicationType.MAPREDUCE.name().equals(app.getApplicationType()) 
+			|| !jobFilter.accept(app.getUser())) {
+				continue;
+			}
+			currentRunningJobSet.add(new JobContext(JobUtils.getJobIDByAppID(app.getId()), app.getUser(), System.currentTimeMillis()));
+		}
+		
+		if (controlConfig.jobConfigEnabled) {
+			addIntoProcessingQueueAndList(currentRunningJobSet, queueOfConfig, ResourceType.JOB_CONFIGURATION);
+		}
+		
+		if (controlConfig.jobInfoEnabled) {			
+			// fetch job detail & jobcounters	
+			for (JobContext context : currentRunningJobSet) {
+				try {
+					List<Object> objs = fetcher.getResource(ResourceType.JOB_RUNNING_INFO, JobUtils.getAppIDByJobID(context.jobId));
+					callback.onJobRunningInformation(context, ResourceType.JOB_RUNNING_INFO, objs);
+				}
+				catch (Exception ex) {
+			        if (ex.getMessage().contains("Server returned HTTP response code: 500")) {
+			        	LOG.warn("The server returns 500 error, it's probably caused by job ACL setting, going to skip this job");
+			        	// the job remains in processing list, thus we will not do infructuous retry next round
+			        	// TODO need remove it from processing list when job finished to avoid memory leak 
+			        }
+			        else LOG.error("Got an exception when fetching resource, jobId: " + context.jobId , ex);
+				}
+			}
+		}		
+	}
+
+	@Override
+	public void addIntoProcessingList(ResourceType type, JobContext context) {
+		processingJobMap.get(type).put(context.jobId, context);
+	}
+
+	@Override
+	public void removeFromProcessingList(ResourceType type, JobContext context) {
+		processingJobMap.get(type).remove(context.jobId);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/XmlHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/XmlHelper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/XmlHelper.java
new file mode 100644
index 0000000..9814f5b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/XmlHelper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+public class XmlHelper {
+	
+	public static Map<String, String> getConfigs(InputStream is) throws IOException, SAXException, ParserConfigurationException
+	{		
+		DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+		DocumentBuilder db = dbf.newDocumentBuilder();
+		
+		Document dt = db.parse(is);
+		Element element = dt.getDocumentElement();
+		Map<String, String> config = new TreeMap<String, String>();
+				
+		NodeList propertyList = element.getElementsByTagName("property");
+		int length = propertyList.getLength();
+		for(int i = 0; i < length; i++) {
+			Node property = propertyList.item(i);
+			String key = property.getChildNodes().item(0).getTextContent();
+			String value = property.getChildNodes().item(1).getTextContent();
+			config.put(key, value);
+		}
+		return config;		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelector.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelector.java
new file mode 100644
index 0000000..2219868
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelector.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.ha;
+
+import java.io.IOException;
+
+/**
+ * @since Aug 21, 2014
+ */
+public interface HAURLSelector {
+	
+	boolean checkUrl(String url);
+		
+	void reSelectUrl() throws IOException;
+	
+	String getSelectedUrl();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
new file mode 100644
index 0000000..21a81ed
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.ha;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.eagle.jobrunning.url.ServiceURLBuilder;
+import org.apache.eagle.jobrunning.util.InputStreamUtils;
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HAURLSelectorImpl implements HAURLSelector {
+
+	private final String[] urls;
+	private volatile String selectedUrl;
+	private final ServiceURLBuilder builder;
+	
+	private volatile boolean reselectInProgress;
+	private final JobConstants.CompressionType compressionType;
+	private static final long MAX_RETRY_TIME = 3;
+	private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class);
+	
+	public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, JobConstants.CompressionType compressionType) {
+		this.urls = urls;
+		this.compressionType = compressionType;
+		this.builder = builder;
+	}
+	
+	public boolean checkUrl(String urlString) {
+		InputStream is = null;
+		try {
+			is = InputStreamUtils.getInputStream(urlString, compressionType);
+		}
+		catch (Exception ex) {
+			LOG.info("get inputstream from url: " + urlString + " failed. ");
+			return false;
+		}
+		finally {
+			if (is != null) { try {	is.close(); } catch (IOException e) {/*Do nothing*/} }
+		}
+		return true;
+	}
+
+	@Override
+	public String getSelectedUrl() {
+		if (selectedUrl == null) {
+			selectedUrl = urls[0];
+		}
+		return selectedUrl;
+	}
+	
+	@Override
+	public void reSelectUrl() throws IOException {
+		if (reselectInProgress) return;
+		synchronized(this) {
+			if (reselectInProgress) return;
+			reselectInProgress = true;
+			try {
+				LOG.info("Going to reselect url");
+				for (int i = 0; i < urls.length; i++) {		
+					String urlToCheck = urls[i];
+					LOG.info("Going to try url :" + urlToCheck);
+					for (int time = 0; time < MAX_RETRY_TIME; time++) {
+						if (checkUrl(builder.build(urlToCheck, JobConstants.JobState.RUNNING.name()))) {
+							selectedUrl = urls[i];
+							LOG.info("Successfully switch to new url : " + selectedUrl);
+							return;
+						}
+						LOG.info("try url " + urlToCheck + "fail for " + (time+1) + " times, sleep 5 seconds before try again. ");
+						try {
+							Thread.sleep(5 * 1000);
+						}
+						catch (InterruptedException ex) { /* Do Nothing */}
+					}
+				}
+				throw new IOException("No alive url found: "+ StringUtils.join(";", Arrays.asList(this.urls)));
+			}
+			finally {
+				reselectInProgress = false;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParser.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParser.java
new file mode 100644
index 0000000..112b73d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParser.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.job.conf;
+
+import java.util.Map;
+
+import org.jsoup.nodes.Document;
+
+public interface JobConfParser {
+	
+	Map<String, String> parse(Document doc);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParserImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParserImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParserImpl.java
new file mode 100644
index 0000000..7a97550
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParserImpl.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.job.conf;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.jsoup.nodes.Document;
+import org.jsoup.nodes.Element;
+import org.jsoup.select.Elements;
+
+public class JobConfParserImpl implements JobConfParser {
+	
+	public Map<String, String> parse(Document doc) {
+		Elements elements = doc.select("table[id=conf]").select("tbody").select("tr");
+		Iterator<Element> iter = elements.iterator();
+		Map<String, String> configs = new HashMap<String, String>();
+		while(iter.hasNext()) {
+			Element element = iter.next();
+			Elements tds = element.children();
+			String key = tds.get(0).text();
+			String value = tds.get(1).text();
+			configs.put(key, value);
+		}
+		return configs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilter.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilter.java
new file mode 100644
index 0000000..3e601e1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.storm;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * define what content in job running stream should be streamed
+ */
+public interface JobRunningContentFilter extends Serializable {
+	boolean acceptJobConf(Map<String, String> config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilterImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilterImpl.java
new file mode 100644
index 0000000..b00f195
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilterImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.storm;
+
+import java.util.Map;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+/**
+ * define what content in job running stream should be streamed
+ */
+public class JobRunningContentFilterImpl implements JobRunningContentFilter {
+	private static final long serialVersionUID = 1L;
+	
+	@Override
+	public boolean acceptJobConf(Map<String, String> config) {
+		if (config.containsKey(JobConstants.HIVE_QUERY_STRING)) {
+			return true;
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpout.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpout.java
new file mode 100644
index 0000000..3dd55c1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpout.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.storm;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.eagle.jobrunning.callback.DefaultRunningJobInputStreamCallback;
+import org.apache.eagle.jobrunning.callback.RunningJobMessageId;
+import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
+import org.apache.eagle.jobrunning.crawler.RunningJobCrawler;
+import org.apache.eagle.jobrunning.crawler.RunningJobCrawlerImpl;
+import org.apache.eagle.jobrunning.zkres.JobRunningZKStateManager;
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.crawler.JobContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+
+import org.apache.eagle.job.JobFilter;
+import org.apache.eagle.job.JobFilterByPartition;
+import org.apache.eagle.job.JobPartitioner;
+import org.apache.eagle.jobrunning.callback.RunningJobCallback;
+
+public class JobRunningSpout extends BaseRichSpout {
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(JobRunningSpout.class);
+
+	private RunningJobCrawlConfig config;
+	private JobRunningZKStateManager zkStateManager;
+	private transient RunningJobCrawler crawler;
+	private JobRunningSpoutCollectorInterceptor interceptor;
+	private RunningJobCallback callback;
+	private ReadWriteLock readWriteLock;
+    private static final int DEFAULT_WAIT_SECONDS_BETWEEN_ROUNDS = 10;
+
+    public JobRunningSpout(RunningJobCrawlConfig config){
+		this(config, new JobRunningSpoutCollectorInterceptor());
+	}
+	
+	/**
+	 * mostly this constructor signature is for unit test purpose as you can put customized interceptor here
+	 * @param config
+	 * @param interceptor
+	 */
+	public JobRunningSpout(RunningJobCrawlConfig config, JobRunningSpoutCollectorInterceptor interceptor){
+		this.config = config;
+		this.interceptor = interceptor;
+		this.callback = new DefaultRunningJobInputStreamCallback(interceptor);
+		this.readWriteLock = new ReentrantReadWriteLock();
+	}
+	
+	
+	/**
+	 * TODO: just copy this part from jobHistorySpout, need to move it to a common place
+	 * @param context
+	 * @return
+	 */
+	private int calculatePartitionId(TopologyContext context){
+		int thisGlobalTaskId = context.getThisTaskId();
+		String componentName = context.getComponentId(thisGlobalTaskId);
+		List<Integer> globalTaskIds = context.getComponentTasks(componentName);
+		int index = 0;
+		for(Integer id : globalTaskIds){
+			if(id == thisGlobalTaskId){
+				return index;
+			}
+			index++;
+		}
+		throw new IllegalStateException();
+	}
+	
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+		int partitionId = calculatePartitionId(context);
+		// sanity verify 0<=partitionId<=numTotalPartitions-1
+		if(partitionId < 0 || partitionId > config.controlConfig.numTotalPartitions){
+			throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " + 
+					partitionId + " and numTotalPartitions " + config.controlConfig.numTotalPartitions);
+		}
+		Class<? extends JobPartitioner> partitionerCls = config.controlConfig.partitionerCls;
+		JobPartitioner partitioner = null;
+		try {
+			partitioner = partitionerCls.newInstance();
+		} catch (Exception e) {
+			LOG.error("failing instantiating job partitioner class " + partitionerCls.getCanonicalName());
+			throw new IllegalStateException(e);
+		}
+		JobFilter jobFilter = new JobFilterByPartition(partitioner, config.controlConfig.numTotalPartitions, partitionId);
+		interceptor.setSpoutOutputCollector(collector);		
+		try {
+			zkStateManager = new JobRunningZKStateManager(config);
+			crawler = new RunningJobCrawlerImpl(config, zkStateManager, callback, jobFilter, readWriteLock);
+		} catch (Exception e) {
+			LOG.error("failing creating crawler driver");
+			throw new IllegalStateException(e);
+		}
+	}
+
+	@Override
+	public void nextTuple() {
+		try{
+			crawler.crawl();
+		}catch(Exception ex){
+			LOG.error("fail crawling running job and continue ...", ex);
+		}
+        try{
+            Thread.sleep(DEFAULT_WAIT_SECONDS_BETWEEN_ROUNDS *1000);
+        }catch(Exception x){
+        }
+    }
+	
+	/**
+	 * empty because framework will take care of output fields declaration
+	 */
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		
+	}
+	
+	/**
+	 * add to processedJob
+	 */
+	@Override
+    public void ack(Object msgId) {
+		RunningJobMessageId messageId = (RunningJobMessageId) msgId;
+		JobConstants.ResourceType type = messageId.type;
+		LOG.info("Ack on messageId: " + messageId.toString());
+		switch(type) {
+			case JOB_CONFIGURATION:
+			case JOB_COMPLETE_INFO:
+				/** lock this for making processed/processing job list unchanged during crawler calculating last round running job list **/
+				try {
+					readWriteLock.readLock().lock();
+					zkStateManager.addProcessedJob(type, messageId.jobID);
+					// Here username & timestamp is meaningless, set to null
+					crawler.removeFromProcessingList(type, new JobContext(messageId.jobID, null, null));
+				}
+				finally {
+					try {readWriteLock.readLock().unlock(); LOG.info("Read lock released");}
+					catch (Throwable t) { LOG.error("Fail to release Read lock", t);}
+				}
+				break;				
+			default:
+				break;
+		}	
+    }
+
+	/**
+	 * job is not fully processed
+	 */
+    @Override
+    public void fail(Object msgId) {
+		RunningJobMessageId messageId = (RunningJobMessageId) msgId;
+		JobConstants.ResourceType type = messageId.type;
+		// Here timestamp is meaningless, set to null
+		if (type.equals(JobConstants.ResourceType.JOB_COMPLETE_INFO) || type.equals(JobConstants.ResourceType.JOB_CONFIGURATION)) {
+			try {
+				readWriteLock.readLock().lock();
+				// Here username in not used, set to null
+				crawler.removeFromProcessingList(type, new JobContext(messageId.jobID, null, null));
+			}
+			finally {
+				try {readWriteLock.readLock().unlock(); LOG.info("Read lock released");}
+				catch (Throwable t) { LOG.error("Fail to release Read lock", t);}
+			}
+		}
+    }
+   
+    @Override
+    public void deactivate() {
+    	
+    }
+   
+    @Override
+    public void close() {
+    	
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpoutCollectorInterceptor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpoutCollectorInterceptor.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpoutCollectorInterceptor.java
new file mode 100644
index 0000000..d65ad93
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpoutCollectorInterceptor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.storm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import backtype.storm.spout.SpoutOutputCollector;
+
+import org.apache.eagle.dataproc.core.EagleOutputCollector;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.jobrunning.callback.RunningJobMessageId;
+
+public class JobRunningSpoutCollectorInterceptor implements EagleOutputCollector{
+
+	private static final long serialVersionUID = 1L;
+	private SpoutOutputCollector collector;
+	
+	public void setSpoutOutputCollector(SpoutOutputCollector collector){
+		this.collector = collector;
+	}
+
+	@Override
+	public void collect(ValuesArray t) {
+		// the first value is fixed as messageId
+		RunningJobMessageId messageId = (RunningJobMessageId) t.get(0);
+		List<Object> list = new ArrayList<Object>();
+		for (int i = 1; i < t.size(); i++) {
+			list.add(t.get(i));
+		}
+		collector.emit(list, messageId);
+	}
+}