You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:46 UTC

[39/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteCounterServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteCounterServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteCounterServiceURLBuilderImpl.java
new file mode 100644
index 0000000..48c7dac
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteCounterServiceURLBuilderImpl.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class JobCompleteCounterServiceURLBuilderImpl implements ServiceURLBuilder {
+		
+	public String build(String ... parameters) {
+		// parameters[0] = historyBaseUrl, parameters[1] = jobID		
+		// {historyUrl}/jobhistory/jobcounters/job_xxxxxxxxxxxxx_xxxxx?anonymous=true
+		return parameters[0] + "jobhistory/jobcounters/" + parameters[1] 
+							 + "?" + JobConstants.ANONYMOUS_PARAMETER;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteDetailServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteDetailServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteDetailServiceURLBuilderImpl.java
new file mode 100644
index 0000000..9c4e9ba
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteDetailServiceURLBuilderImpl.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.util.JobUtils;
+
+public class JobCompleteDetailServiceURLBuilderImpl implements ServiceURLBuilder {
+	
+	public String build(String ... parameters) {
+		// parameters[0] = baseUrl, parameters[1] = jobID		
+		// {baseUrl}/ws/v1/cluster/apps/job_xxxxxxxxxxxxx_xxxxx?anonymous=true
+		return parameters[0] + JobConstants.V2_COMPLETE_APPS_URL
+			 + JobUtils.getAppIDByJobID(parameters[1])
+			 + "?" + JobConstants.ANONYMOUS_PARAMETER;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompletedConfigServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompletedConfigServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompletedConfigServiceURLBuilderImpl.java
new file mode 100644
index 0000000..077cb0c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompletedConfigServiceURLBuilderImpl.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+
+public class JobCompletedConfigServiceURLBuilderImpl implements ServiceURLBuilder {
+	
+	public String build(String ... parameters) {
+		// parameters[0] = baseUrl, parameters[1] = jobID
+		// {historyUrl}/jobhistory/conf/job_xxxxxxxxxxxxx_xxxxxx		
+		return parameters[0] + "jobhistory/conf/" + parameters[1];		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCountersServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCountersServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCountersServiceURLBuilderImpl.java
new file mode 100644
index 0000000..ad2aadd
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCountersServiceURLBuilderImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.util.JobUtils;
+
+public class JobCountersServiceURLBuilderImpl implements ServiceURLBuilder {
+	
+	public String build(String ... parameters) {
+		// parameter[0] = baseUrl, parameter[1] = appID;
+		// {rmUrl}/proxy/application_xxxxxxxxxxxxx_xxxxxx/ws/v1/mapreduce/jobs/job_xxxxxxxxxxxxx_xxxxxx/counters?anonymous=true"
+		return parameters[0] + JobConstants.V2_PROXY_PREFIX_URL +
+			   parameters[1] + JobConstants.V2_MR_APPMASTER_PREFIX +
+			   JobUtils.getJobIDByAppID(parameters[1]) + JobConstants.V2_MR_COUNTERS_URL +
+			   "?" + JobConstants.ANONYMOUS_PARAMETER;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobDetailServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobDetailServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobDetailServiceURLBuilderImpl.java
new file mode 100644
index 0000000..ead9077
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobDetailServiceURLBuilderImpl.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class JobDetailServiceURLBuilderImpl implements ServiceURLBuilder {
+	
+	public String build(String... parameters) {
+		// parameter[0] = baseUrl , parameter[1] = appID
+		// {rmUrl}/proxy/application_xxx/ws/v1/mapreduce/jobs?anonymous=true
+		return parameters[0] + JobConstants.V2_PROXY_PREFIX_URL + parameters[1] + JobConstants.V2_APP_DETAIL_URL + "?" + JobConstants.ANONYMOUS_PARAMETER;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobListServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobListServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobListServiceURLBuilderImpl.java
new file mode 100644
index 0000000..92cb09a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobListServiceURLBuilderImpl.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class JobListServiceURLBuilderImpl implements ServiceURLBuilder {
+	
+	public String build(String ... parameters) {
+		// {rmUrl}/ws/v1/cluster/apps?state=RUNNING 
+		String jobState = parameters[1];
+		if (jobState.equals(JobConstants.JobState.RUNNING.name())) {
+			return parameters[0] + JobConstants.V2_APPS_RUNNING_URL + "&" + JobConstants.ANONYMOUS_PARAMETER;
+		}
+		else if (jobState.equals(JobConstants.JobState.COMPLETED.name())) {
+			return parameters[0] + JobConstants.V2_APPS_COMPLETED_URL + "&" + JobConstants.ANONYMOUS_PARAMETER;		
+		}
+		else if (jobState.equals(JobConstants.JobState.ALL.name())) {
+			return parameters[0] + JobConstants.V2_APPS_URL + "&" + JobConstants.ANONYMOUS_PARAMETER;		
+		}
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobRunningConfigServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobRunningConfigServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobRunningConfigServiceURLBuilderImpl.java
new file mode 100644
index 0000000..bc883d4
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobRunningConfigServiceURLBuilderImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.util.JobUtils;
+
+public class JobRunningConfigServiceURLBuilderImpl implements ServiceURLBuilder {
+	
+	public String build(String ... parameters) {
+		// parameters[0] = baseUrl, parameters[1] = jobID
+		// {baseUrl}/proxy/application_xxxxxxxxxxxxx_xxxxx/ws/v1/mapreduce/jobs/job_xxxxxxxxxxxxx_xxxxx/conf
+		String urlString = parameters[0] + JobConstants.V2_PROXY_PREFIX_URL
+				+ JobUtils.getAppIDByJobID(parameters[1]) + JobConstants.V2_MR_APPMASTER_PREFIX
+				+ parameters[1] + JobConstants.V2_CONF_URL 
+				+ "?" + JobConstants.ANONYMOUS_PARAMETER;
+		return urlString;		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobStatusServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobStatusServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobStatusServiceURLBuilderImpl.java
new file mode 100644
index 0000000..bfe162c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobStatusServiceURLBuilderImpl.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class JobStatusServiceURLBuilderImpl implements ServiceURLBuilder {
+		
+	public String build(String ... parameters) {
+		// parameters[0] = rmUrl, parameters[1] = appID
+		// {rmUrl}/ws/v1/cluster/apps/application_xxxxxxxxxxxxx_xxxxx?anonymous=true
+		return parameters[0] + "ws/v1/cluster/apps/" + parameters[1] 
+							 + "?" + JobConstants.ANONYMOUS_PARAMETER;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/ServiceURLBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/ServiceURLBuilder.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/ServiceURLBuilder.java
new file mode 100644
index 0000000..5a50462
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/ServiceURLBuilder.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+public interface ServiceURLBuilder {
+	String build(String ... parameters);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java
new file mode 100644
index 0000000..62a15af
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class InputStreamUtils {
+
+	private static final int CONNECTION_TIMEOUT = 10 * 1000;
+	private static final int READ_TIMEOUT = 5 * 60 * 1000;
+	private static final String GZIP_HTTP_HEADER = "Accept-Encoding";
+	private static final String GZIP_COMPRESSION = "gzip";
+	
+	private static InputStream openGZIPInputStream(URL url, int timeout) throws IOException {
+		final URLConnection connection = url.openConnection();
+		connection.setConnectTimeout(CONNECTION_TIMEOUT);
+		connection.setReadTimeout(timeout);
+		connection.addRequestProperty(GZIP_HTTP_HEADER, GZIP_COMPRESSION);
+		return new GZIPInputStream(connection.getInputStream());
+	}
+	
+	private static InputStream openInputStream(URL url, int timeout) throws IOException {
+		URLConnection connection = url.openConnection();
+		connection.setConnectTimeout(timeout);
+		return connection.getInputStream();
+	}
+	
+	public static InputStream getInputStream(String urlString, JobConstants.CompressionType compressionType, int timeout) throws Exception {
+		final URL url = URLConnectionUtils.getUrl(urlString);
+		if (compressionType.equals(JobConstants.CompressionType.GZIP)) {
+			return openGZIPInputStream(url, timeout);
+		}
+		else { // CompressionType.NONE
+			return openInputStream(url, timeout);
+		}
+	}
+	
+	public static InputStream getInputStream(String urlString, JobConstants.CompressionType compressionType) throws Exception {
+		return getInputStream(urlString, compressionType, READ_TIMEOUT);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/JobUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/JobUtils.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/JobUtils.java
new file mode 100644
index 0000000..9af37ca
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/JobUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.util;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class JobUtils {
+	
+	public static String checkAndAddLastSlash(String urlBase) {
+		if (!urlBase.endsWith("/")) {
+			return urlBase + "/";
+		}
+		return urlBase;
+	}
+	
+	public static String getJobIDByAppID(String appID) {
+		if (appID.startsWith(JobConstants.APPLICATION_PREFIX)) {
+			return appID.replace(JobConstants.APPLICATION_PREFIX, JobConstants.JOB_PREFIX);
+		}
+		return null;
+	}
+
+	public static String getAppIDByJobID(String jobID) {
+		if (jobID.startsWith(JobConstants.JOB_PREFIX)) {
+			return jobID.replace(JobConstants.JOB_PREFIX, JobConstants.APPLICATION_PREFIX);
+		}
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/URLConnectionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/URLConnectionUtils.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/URLConnectionUtils.java
new file mode 100644
index 0000000..314fb0a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/URLConnectionUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.util;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class URLConnectionUtils {
+	//TODO: change some public method to private
+    private static final Logger LOG = LoggerFactory.getLogger(URLConnectionUtils.class);
+	
+	public static URLConnection getConnection(String url) throws Exception {
+		if (url.startsWith("https://")) {
+			return getHTTPSConnection(url);
+		} else if (url.startsWith("http://")) {
+			return getHTTPConnection(url);
+		}
+		throw new Exception("Invalid input argument url: " + url);
+	}
+
+	public static URLConnection getHTTPConnection(String urlString) throws Exception {
+		final URL url = new URL(urlString);
+		return url.openConnection();
+	}
+
+	public static URL getUrl(String urlString) throws Exception  {
+		if(urlString.toLowerCase().contains("https")){
+			return getHTTPSUrl(urlString);
+		}else if (urlString.toLowerCase().contains("http")) {
+			return getURL(urlString);
+		}
+		throw new Exception("Invalid input argument url: " + urlString);
+	}
+	
+	public static URL getURL(String urlString) throws MalformedURLException {
+		return new URL(urlString);
+	}
+	
+	public static URL getHTTPSUrl(String urlString) throws MalformedURLException, NoSuchAlgorithmException, KeyManagementException  {
+    	// Create a trust manager that does not validate certificate chains   
+        final TrustManager[] trustAllCerts = new TrustManager[] {new TrustAllX509TrustManager()};
+        // Install the all-trusting trust manager   
+        final SSLContext sc = SSLContext.getInstance("SSL");   
+        sc.init(null, trustAllCerts, new java.security.SecureRandom());   
+        HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());   
+        // Create all-trusting host name verifier   
+        final HostnameVerifier allHostsValid = new HostnameVerifier() {   
+            public boolean verify(String hostname, SSLSession session) {   
+                return true;   
+            }   
+        };
+        HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);
+        return new URL(urlString);
+	}
+
+	public static URLConnection getHTTPSConnection(String urlString) throws IOException, KeyManagementException, NoSuchAlgorithmException  {
+       	final URL url = getHTTPSUrl(urlString);
+       	return url.openConnection();
+	}
+	
+	public static class TrustAllX509TrustManager implements X509TrustManager {
+		@Override
+		public void checkClientTrusted(
+				java.security.cert.X509Certificate[] chain, String authType)
+				throws CertificateException {
+		}
+
+		@Override
+		public void checkServerTrusted(
+				java.security.cert.X509Certificate[] chain, String authType)
+				throws CertificateException {
+		}
+
+		@Override
+		public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/App.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/App.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/App.java
new file mode 100644
index 0000000..85a717d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/App.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class App {
+	private String id;
+	private String user;
+	private String name;
+	private String queue;
+	private String state;
+	private String finalStatus;
+	private double progress;
+	private String trackingUI;
+	private String trackingUrl;
+	private String diagnostics;
+	private String clusterId;
+	private String applicationType;
+	private long startedTime;
+	private long finishedTime;
+	private long elapsedTime;
+	private String amContainerLogs;
+	private String amHostHttpAddress;
+	
+	public String getId() {
+		return id;
+	}
+	public void setId(String id) {
+		this.id = id;
+	}
+	public String getUser() {
+		return user;
+	}
+	public void setUser(String user) {
+		this.user = user;
+	}
+	public String getName() {
+		return name;
+	}
+	public void setName(String name) {
+		this.name = name;
+	}
+	public String getQueue() {
+		return queue;
+	}
+	public void setQueue(String queue) {
+		this.queue = queue;
+	}
+	public String getState() {
+		return state;
+	}
+	public void setState(String state) {
+		this.state = state;
+	}
+	public String getFinalStatus() {
+		return finalStatus;
+	}
+	public void setFinalStatus(String finalStatus) {
+		this.finalStatus = finalStatus;
+	}
+	public double getProgress() {
+		return progress;
+	}
+	public void setProgress(double progress) {
+		this.progress = progress;
+	}
+	public String getTrackingUI() {
+		return trackingUI;
+	}
+	public void setTrackingUI(String trackingUI) {
+		this.trackingUI = trackingUI;
+	}
+	public String getTrackingUrl() {
+		return trackingUrl;
+	}
+	public void setTrackingUrl(String trackingUrl) {
+		this.trackingUrl = trackingUrl;
+	}
+	public String getDiagnostics() {
+		return diagnostics;
+	}
+	public void setDiagnostics(String diagnostics) {
+		this.diagnostics = diagnostics;
+	}
+	public String getClusterId() {
+		return clusterId;
+	}
+	public void setClusterId(String clusterId) {
+		this.clusterId = clusterId;
+	}
+	public String getApplicationType() {
+		return applicationType;
+	}
+	public void setApplicationType(String applicationType) {
+		this.applicationType = applicationType;
+	}
+	public long getStartedTime() {
+		return startedTime;
+	}
+	public void setStartedTime(long startedTime) {
+		this.startedTime = startedTime;
+	}
+	public long getFinishedTime() {
+		return finishedTime;
+	}
+	public void setFinishedTime(long finishedTime) {
+		this.finishedTime = finishedTime;
+	}
+	public long getElapsedTime() {
+		return elapsedTime;
+	}
+	public void setElapsedTime(long elapsedTime) {
+		this.elapsedTime = elapsedTime;
+	}
+	public String getAmContainerLogs() {
+		return amContainerLogs;
+	}
+	public void setAmContainerLogs(String amContainerLogs) {
+		this.amContainerLogs = amContainerLogs;
+	}
+	public String getAmHostHttpAddress() {
+		return amHostHttpAddress;
+	}
+	public void setAmHostHttpAddress(String amHostHttpAddress) {
+		this.amHostHttpAddress = amHostHttpAddress;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppInfo.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppInfo.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppInfo.java
new file mode 100644
index 0000000..d1a4407
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppInfo.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppInfo {
+	String id;
+	String user;
+	String name;
+	String queue;
+	String state;
+	String finalStatus;
+	double progress;
+	String trackingUI;
+	String trackingUrl;
+	String diagnostics;
+	String clusterId;
+	String applicationType;
+	long startedTime;
+	long finishedTime;
+	long elapsedTime;
+	String amContainerLogs;
+	String amHostHttpAddress;
+	
+	public String getId() {
+		return id;
+	}
+	public void setId(String id) {
+		this.id = id;
+	}
+	public String getUser() {
+		return user;
+	}
+	public void setUser(String user) {
+		this.user = user;
+	}
+	public String getName() {
+		return name;
+	}
+	public void setName(String name) {
+		this.name = name;
+	}
+	public String getQueue() {
+		return queue;
+	}
+	public void setQueue(String queue) {
+		this.queue = queue;
+	}
+	public String getState() {
+		return state;
+	}
+	public void setState(String state) {
+		this.state = state;
+	}
+	public String getFinalStatus() {
+		return finalStatus;
+	}
+	public void setFinalStatus(String finalStatus) {
+		this.finalStatus = finalStatus;
+	}
+	public double getProgress() {
+		return progress;
+	}
+	public void setProgress(double progress) {
+		this.progress = progress;
+	}
+	public String getTrackingUI() {
+		return trackingUI;
+	}
+	public void setTrackingUI(String trackingUI) {
+		this.trackingUI = trackingUI;
+	}
+	public String getTrackingUrl() {
+		return trackingUrl;
+	}
+	public void setTrackingUrl(String trackingUrl) {
+		this.trackingUrl = trackingUrl;
+	}
+	public String getDiagnostics() {
+		return diagnostics;
+	}
+	public void setDiagnostics(String diagnostics) {
+		this.diagnostics = diagnostics;
+	}
+	public String getClusterId() {
+		return clusterId;
+	}
+	public void setClusterId(String clusterId) {
+		this.clusterId = clusterId;
+	}
+	public String getApplicationType() {
+		return applicationType;
+	}
+	public void setApplicationType(String applicationType) {
+		this.applicationType = applicationType;
+	}
+	public long getStartedTime() {
+		return startedTime;
+	}
+	public void setStartedTime(long startedTime) {
+		this.startedTime = startedTime;
+	}
+	public long getFinishedTime() {
+		return finishedTime;
+	}
+	public void setFinishedTime(long finishedTime) {
+		this.finishedTime = finishedTime;
+	}
+	public long getElapsedTime() {
+		return elapsedTime;
+	}
+	public void setElapsedTime(long elapsedTime) {
+		this.elapsedTime = elapsedTime;
+	}
+	public String getAmContainerLogs() {
+		return amContainerLogs;
+	}
+	public void setAmContainerLogs(String amContainerLogs) {
+		this.amContainerLogs = amContainerLogs;
+	}
+	public String getAmHostHttpAddress() {
+		return amHostHttpAddress;
+	}
+	public void setAmHostHttpAddress(String amHostHttpAddress) {
+		this.amHostHttpAddress = amHostHttpAddress;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppWrapper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppWrapper.java
new file mode 100644
index 0000000..1f807d1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppWrapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppWrapper {
+
+	private AppInfo app;
+
+	public AppInfo getApp() {
+		return app;
+	}
+
+	public void setApp(AppInfo app) {
+		this.app = app;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Applications.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Applications.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Applications.java
new file mode 100644
index 0000000..b76ee6b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Applications.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import java.util.List;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Applications {
+
+	private List<AppInfo> app;
+
+	public List<AppInfo> getApp() {
+		return app;
+	}
+
+	public void setApp(List<AppInfo> app) {
+		this.app = app;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppsWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppsWrapper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppsWrapper.java
new file mode 100644
index 0000000..79abd29
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppsWrapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppsWrapper {
+	
+	private Applications apps;
+
+	public Applications getApps() {
+		return apps;
+	}
+
+	public void setApps(Applications apps) {
+		this.apps = apps;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Counter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Counter.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Counter.java
new file mode 100644
index 0000000..afc487e
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Counter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+public class Counter {
+
+	private String name;
+	private long totalCounterValue;
+	private long mapCounterValue;
+	private long reduceCounterValue;
+	
+	public String getName() {
+		return name;
+	}
+	public void setName(String name) {
+		this.name = name;
+	}
+	public long getTotalCounterValue() {
+		return totalCounterValue;
+	}
+	public void setTotalCounterValue(long totalCounterValue) {
+		this.totalCounterValue = totalCounterValue;
+	}
+	public long getMapCounterValue() {
+		return mapCounterValue;
+	}
+	public void setMapCounterValue(long mapCounterValue) {
+		this.mapCounterValue = mapCounterValue;
+	}
+	public long getReduceCounterValue() {
+		return reduceCounterValue;
+	}
+	public void setReduceCounterValue(long reduceCounterValue) {
+		this.reduceCounterValue = reduceCounterValue;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/CounterGroup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/CounterGroup.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/CounterGroup.java
new file mode 100644
index 0000000..22e498c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/CounterGroup.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import java.util.List;
+
+public class CounterGroup {
+
+	private String counterGroupName;
+	private List<Counter> counter;
+	
+	public String getCounterGroupName() {
+		return counterGroupName;
+	}
+	public void setCounterGroupName(String counterGroupName) {
+		this.counterGroupName = counterGroupName;
+	}
+	public List<Counter> getCounter() {
+		return counter;
+	}
+	public void setCounter(List<Counter> counter) {
+		this.counter = counter;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCompleteWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCompleteWrapper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCompleteWrapper.java
new file mode 100644
index 0000000..124dc65
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCompleteWrapper.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JobCompleteWrapper {
+	private App app;
+
+	public App getApp() {
+		return app;
+	}
+
+	public void setApp(App app) {
+		this.app = app;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCounters.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCounters.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCounters.java
new file mode 100644
index 0000000..76b722c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCounters.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import java.util.List;
+
+public class JobCounters {
+
+	private String id;
+	private List<CounterGroup> counterGroup;
+	
+	public String getId() {
+		return id;
+	}
+	public void setId(String id) {
+		this.id = id;
+	}
+	public List<CounterGroup> getCounterGroup() {
+		return counterGroup;
+	}
+	public void setCounterGroup(List<CounterGroup> counterGroup) {
+		this.counterGroup = counterGroup;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCountersWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCountersWrapper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCountersWrapper.java
new file mode 100644
index 0000000..f3bb136
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCountersWrapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+
+public class JobCountersWrapper {
+
+	private JobCounters jobCounters;
+
+	public JobCounters getJobCounters() {
+		return jobCounters;
+	}
+
+	public void setJobCounters(JobCounters jobCounters) {
+		this.jobCounters = jobCounters;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobDetailInfo.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobDetailInfo.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobDetailInfo.java
new file mode 100644
index 0000000..cf2fa1f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobDetailInfo.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.apache.eagle.jobrunning.counter.JobCounters;
+
+public class JobDetailInfo {
+
+	private long startTime;
+	private long finishTime;
+	private long elapsedTime;
+	private String id;
+	private String name;
+	private String user;
+	private String state;
+	private int mapsTotal;
+	private int mapsCompleted;
+	private int reducesTotal;
+	private int reducesCompleted;
+	private double mapProgress;
+	private double reduceProgress;
+	private int mapsPending;
+	private int mapsRunning;
+	private int reducesPending;
+	private int reducesRunning;
+	private boolean uberized;
+	private String diagnostics;
+	private int newReduceAttempts;
+	private int runningReduceAttempts;
+	private int failedReduceAttempts;
+	private int killedReduceAttempts;
+	private int successfulReduceAttempts;
+	private int newMapAttempts;
+	private int runningMapAttempts;
+	private int failedMapAttempts;
+	private int killedMapAttempts;
+	private int successfulMapAttempts;
+	private String queue;
+	private org.apache.eagle.jobrunning.counter.JobCounters jobcounter;
+	
+	public long getStartTime() {
+		return startTime;
+	}
+	public void setStartTime(long startTime) {
+		this.startTime = startTime;
+	}
+	public long getFinishTime() {
+		return finishTime;
+	}
+	public void setFinishTime(long finishTime) {
+		this.finishTime = finishTime;
+	}
+	public long getElapsedTime() {
+		return elapsedTime;
+	}
+	public void setElapsedTime(long elapsedTime) {
+		this.elapsedTime = elapsedTime;
+	}
+	public String getId() {
+		return id;
+	}
+	public void setId(String id) {
+		this.id = id;
+	}
+	public String getName() {
+		return name;
+	}
+	public void setName(String name) {
+		this.name = name;
+	}
+	public String getUser() {
+		return user;
+	}
+	public void setUser(String user) {
+		this.user = user;
+	}
+	public String getState() {
+		return state;
+	}
+	public void setState(String state) {
+		this.state = state;
+	}
+	public int getMapsTotal() {
+		return mapsTotal;
+	}
+	public void setMapsTotal(int mapsTotal) {
+		this.mapsTotal = mapsTotal;
+	}
+	public int getMapsCompleted() {
+		return mapsCompleted;
+	}
+	public void setMapsCompleted(int mapsCompleted) {
+		this.mapsCompleted = mapsCompleted;
+	}
+	public int getReducesTotal() {
+		return reducesTotal;
+	}
+	public void setReducesTotal(int reducesTotal) {
+		this.reducesTotal = reducesTotal;
+	}
+	public int getReducesCompleted() {
+		return reducesCompleted;
+	}
+	public void setReducesCompleted(int reducesCompleted) {
+		this.reducesCompleted = reducesCompleted;
+	}
+	public double getMapProgress() {
+		return mapProgress;
+	}
+	public void setMapProgress(double mapProgress) {
+		this.mapProgress = mapProgress;
+	}
+	public double getReduceProgress() {
+		return reduceProgress;
+	}
+	public void setReduceProgress(double reduceProgress) {
+		this.reduceProgress = reduceProgress;
+	}
+	public int getMapsPending() {
+		return mapsPending;
+	}
+	public void setMapsPending(int mapsPending) {
+		this.mapsPending = mapsPending;
+	}
+	public int getMapsRunning() {
+		return mapsRunning;
+	}
+	public void setMapsRunning(int mapsRunning) {
+		this.mapsRunning = mapsRunning;
+	}
+	public int getReducesPending() {
+		return reducesPending;
+	}
+	public void setReducesPending(int reducesPending) {
+		this.reducesPending = reducesPending;
+	}
+	public int getReducesRunning() {
+		return reducesRunning;
+	}
+	public void setReducesRunning(int reducesRunning) {
+		this.reducesRunning = reducesRunning;
+	}
+	public boolean isUberized() {
+		return uberized;
+	}
+	public void setUberized(boolean uberized) {
+		this.uberized = uberized;
+	}
+	public String getDiagnostics() {
+		return diagnostics;
+	}
+	public void setDiagnostics(String diagnostics) {
+		this.diagnostics = diagnostics;
+	}
+	public int getNewReduceAttempts() {
+		return newReduceAttempts;
+	}
+	public void setNewReduceAttempts(int newReduceAttempts) {
+		this.newReduceAttempts = newReduceAttempts;
+	}
+	public int getRunningReduceAttempts() {
+		return runningReduceAttempts;
+	}
+	public void setRunningReduceAttempts(int runningReduceAttempts) {
+		this.runningReduceAttempts = runningReduceAttempts;
+	}
+	public int getFailedReduceAttempts() {
+		return failedReduceAttempts;
+	}
+	public void setFailedReduceAttempts(int failedReduceAttempts) {
+		this.failedReduceAttempts = failedReduceAttempts;
+	}
+	public int getKilledReduceAttempts() {
+		return killedReduceAttempts;
+	}
+	public void setKilledReduceAttempts(int killedReduceAttempts) {
+		this.killedReduceAttempts = killedReduceAttempts;
+	}
+	public int getSuccessfulReduceAttempts() {
+		return successfulReduceAttempts;
+	}
+	public void setSuccessfulReduceAttempts(int successfulReduceAttempts) {
+		this.successfulReduceAttempts = successfulReduceAttempts;
+	}
+	public int getNewMapAttempts() {
+		return newMapAttempts;
+	}
+	public void setNewMapAttempts(int newMapAttempts) {
+		this.newMapAttempts = newMapAttempts;
+	}
+	public int getRunningMapAttempts() {
+		return runningMapAttempts;
+	}
+	public void setRunningMapAttempts(int runningMapAttempts) {
+		this.runningMapAttempts = runningMapAttempts;
+	}
+	public int getFailedMapAttempts() {
+		return failedMapAttempts;
+	}
+	public void setFailedMapAttempts(int failedMapAttempts) {
+		this.failedMapAttempts = failedMapAttempts;
+	}
+	public int getKilledMapAttempts() {
+		return killedMapAttempts;
+	}
+	public void setKilledMapAttempts(int killedMapAttempts) {
+		this.killedMapAttempts = killedMapAttempts;
+	}
+	public int getSuccessfulMapAttempts() {
+		return successfulMapAttempts;
+	}
+	public void setSuccessfulMapAttempts(int successfulMapAttempts) {
+		this.successfulMapAttempts = successfulMapAttempts;
+	}
+	public String getQueue() {
+		return queue;
+	}
+	public void setQueue(String queue) {
+		this.queue = queue;
+	}
+	public JobCounters getJobcounter() {
+		return jobcounter;
+	}
+	public void setJobcounter(JobCounters jobcounter) {
+		this.jobcounter = jobcounter;
+	}
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Jobs.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Jobs.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Jobs.java
new file mode 100644
index 0000000..a9637f7
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Jobs.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import java.util.List;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Jobs {
+
+	private List<JobDetailInfo> job;
+
+	public List<JobDetailInfo> getJob() {
+		return job;
+	}
+
+	public void setJob(List<JobDetailInfo> job) {
+		this.job = job;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobsWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobsWrapper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobsWrapper.java
new file mode 100644
index 0000000..c139ccf
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobsWrapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JobsWrapper {
+
+	private Jobs jobs;
+
+	public Jobs getJobs() {
+		return jobs;
+	}
+
+	public void setJobs(Jobs jobs) {
+		this.jobs = jobs;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateLCM.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateLCM.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateLCM.java
new file mode 100644
index 0000000..6af2a19
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateLCM.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.zkres;
+
+import java.util.List;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public interface JobRunningZKStateLCM {
+	
+	List<String> readProcessedJobs(JobConstants.ResourceType type);
+	
+	void addProcessedJob(JobConstants.ResourceType type, String jobID);
+
+	// date format e.g. "20150901"
+	void truncateJobBefore(JobConstants.ResourceType type, String date);
+		
+	void truncateProcessedJob(JobConstants.ResourceType type, String jobID);
+	
+	void truncateEverything();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateManager.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateManager.java
new file mode 100644
index 0000000..41da60a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateManager.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.zkres;
+
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.common.DateTimeUtil;
+
+public class JobRunningZKStateManager implements JobRunningZKStateLCM{
+	public static final Logger LOG = LoggerFactory.getLogger(JobRunningZKStateManager.class);
+	private String zkRoot;
+	private CuratorFramework _curator;
+	
+	public static final String DATE_FORMAT_PATTERN = "yyyyMMdd";
+	
+	private CuratorFramework newCurator(RunningJobCrawlConfig config) throws Exception {
+        return CuratorFrameworkFactory.newClient(
+        	config.zkStateConfig.zkQuorum,
+            config.zkStateConfig.zkSessionTimeoutMs,
+            15000,
+            new RetryNTimes(config.zkStateConfig.zkRetryTimes, config.zkStateConfig.zkRetryInterval)
+        );
+    }
+	  
+	public JobRunningZKStateManager(RunningJobCrawlConfig config) {
+		this.zkRoot = config.zkStateConfig.zkRoot;
+        try {
+            _curator = newCurator(config);
+            _curator.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+	
+	public void close() {
+        _curator.close();
+        _curator = null;
+    }
+	
+	public long getTimestampFromDate(String dateStr) throws ParseException {
+		SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_PATTERN);
+        sdf.setTimeZone(EagleConfigFactory.load().getTimeZone());
+		Date d = sdf.parse(dateStr);
+		return d.getTime();
+	}
+	
+	@Override
+	public List<String> readProcessedJobs(JobConstants.ResourceType type) {
+		String path = zkRoot + "/" + type.name() + "/jobs";
+		InterProcessMutex lock = new InterProcessMutex(_curator, path);
+		try {
+			lock.acquire();
+            if (_curator.checkExists().forPath(path) != null) {
+            	LOG.info("Got processed job list from zk, type: " + type.name());
+            	return _curator.getChildren().forPath(path);
+            } else {
+            	LOG.info("Currently processed job list is empty, type: " + type.name());
+                return new ArrayList<String>();
+            }
+        } catch (Exception e) {
+        	LOG.error("fail read processed jobs", e);
+            throw new RuntimeException(e);
+        }
+		finally {
+        	try{
+        		lock.release();
+        	}catch(Exception e){
+        		LOG.error("fail releasing lock", e);
+        		throw new RuntimeException(e);
+        	}
+		}
+	}
+
+	@Override
+	public void addProcessedJob(JobConstants.ResourceType type, String jobID) {
+		String path = zkRoot + "/" + type.name() + "/jobs/" + jobID;
+		try {
+			//we record date for cleanup, e.g. cleanup job's znodes whose created date < 20150801
+			String date = DateTimeUtil.format(System.currentTimeMillis(), DATE_FORMAT_PATTERN);
+			LOG.info("add processed job, jobID: " + jobID + ", type: " + type + ", date: " + date);
+            if (_curator.checkExists().forPath(path) == null) {
+                _curator.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, date.getBytes(StandardCharsets.UTF_8));
+            }
+            else {
+                LOG.warn("Job already exist in zk, skip the job: " + jobID + " , type: " + type);
+            }
+        } catch (Exception e) {
+        	LOG.error("fail adding processed jobs", e);
+            throw new RuntimeException(e);
+        }
+	}
+
+	@Override
+	public void truncateJobBefore(JobConstants.ResourceType type, String date) {
+		String path = zkRoot + "/" + type.name() + "/jobs";
+		InterProcessMutex lock = new InterProcessMutex(_curator, path);
+		try {
+			lock.acquire();
+    		long thresholdTime = getTimestampFromDate(date);
+            if (_curator.checkExists().forPath(path) != null) {
+    			LOG.info("Going to delete processed job before " + date + ", type: " + type);
+            	List<String> jobIDList = _curator.getChildren().forPath(path);
+            	for(String jobID : jobIDList) {
+            		if (!jobID.startsWith("job_")) continue; // skip lock node
+            		String jobPath = path + "/" + jobID;
+            		long createTime = getTimestampFromDate(new String(_curator.getData().forPath(jobPath), StandardCharsets.UTF_8));
+            		if (createTime < thresholdTime) {
+            			LOG.info("Going to truncate job: " + jobPath);
+        				_curator.delete().deletingChildrenIfNeeded().forPath(jobPath);
+            		}
+            	}
+            }
+            else {
+            	LOG.info("Currently processed job list is empty, type: " + type.name());                
+            }
+        } catch (Exception e) {
+        	LOG.error("fail deleting processed jobs", e);
+            throw new RuntimeException(e);
+        }
+		finally {
+        	try{
+        		lock.release();
+        	}catch(Exception e){
+        		LOG.error("fail releasing lock", e);
+        		throw new RuntimeException(e);
+        	}
+		}
+	}
+	
+	@Override
+	public void truncateProcessedJob(JobConstants.ResourceType type, String jobID) {
+		LOG.info("trying to truncate all data for job " + jobID);
+		String path = zkRoot + "/" + type.name() + "/jobs/" + jobID;
+		InterProcessMutex lock = new InterProcessMutex(_curator, path);
+		try {
+			lock.acquire();
+            if (_curator.checkExists().forPath(path) != null) {
+                _curator.delete().deletingChildrenIfNeeded().forPath(path);
+                LOG.info("really truncated all data for jobID: " + jobID);
+            }
+        } catch (Exception e) {
+        	LOG.error("fail truncating processed jobs", e);
+    		throw new RuntimeException(e);
+        }
+		finally {
+        	try{
+        		lock.release();
+        	}catch(Exception e){
+        		LOG.error("fail releasing lock", e);
+        		throw new RuntimeException(e);
+        	}
+		}
+	}
+
+	@Override
+	public void truncateEverything() {
+		String path = zkRoot;
+		InterProcessMutex lock = new InterProcessMutex(_curator, path);
+		try{
+			lock.acquire();
+			if(_curator.checkExists().forPath(path) != null){
+				_curator.delete().deletingChildrenIfNeeded().forPath(path);
+			}
+		}catch(Exception ex){
+			LOG.error("fail truncating verything", ex);
+			throw new RuntimeException(ex);
+		}
+		finally {
+        	try{
+        		lock.release();
+        	}catch(Exception e){
+        		LOG.error("fail releasing lock", e);
+        		throw new RuntimeException(e);
+        	}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf
index ee12356..9a44f89 100644
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf
@@ -16,7 +16,7 @@
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 2000,    
     "RMEndPoints" : "http://localhost:8088/",
-    "partitionerCls" : "eagle.job.DefaultJobPartitionerImpl"
+    "partitionerCls" : "org.apache.eagle.job.DefaultJobPartitionerImpl"
   },
   "eagleProps" : {
     "site" : "sandbox",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java
deleted file mode 100644
index a0297c6..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.impl.storm;
-
-import backtype.storm.topology.base.BaseRichSpout;
-
-import com.typesafe.config.Config;
-
-/**
- * Normally storm spout is a special part of storm topology and it is implemented in underlying spout implementation
- * which can be retrieved from getSpout method.
- */
-public abstract class AbstractStormSpoutProvider{
-	public abstract BaseRichSpout getSpout(Config context);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java
deleted file mode 100644
index 7811370..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.impl.storm.hdfs;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.*;
-import java.util.zip.GZIPInputStream;
-
-import com.typesafe.config.Config;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import eagle.dataproc.impl.storm.hdfs.HDFSSourcedStormSpoutProvider.HDFSSpout;
-
-public class DataCollectionHDFSSpout extends HDFSSpout{
-
-	private static final long serialVersionUID = 8775646842131298552L;
-	private Config configContext;
-	private TopologyContext _context; 
-	SpoutOutputCollector _collector;
-	private Map<String, Boolean> processFileMap = null; 
-	private static final Logger LOG = LoggerFactory.getLogger(DataCollectionHDFSSpout.class);
-	
-	public DataCollectionHDFSSpout(Config configContext){
-		this.configContext = configContext;
-		processFileMap = new HashMap<String, Boolean>();
-		LOG.info("DataCollectionHDFSSpout called");
-		
-	}
-	
-	public void copyFiles(){
-		LOG.info("Inside listFiles()");
-		Configuration conf = new Configuration(); 
-		// _____________ TO TEST THAT CORRECT HADOOP JARs ARE INCLUDED __________________
-		ClassLoader cl = ClassLoader.getSystemClassLoader();
-        URL[] urls = ((URLClassLoader)cl).getURLs();
-		if(LOG.isDebugEnabled()) {
-			for (URL url : urls) {
-				LOG.debug(url.getFile());
-			}
-		}
-		// _________________________________________
-        String hdfsConnectionStr = configContext.getString("dataSourceConfig.hdfsConnnection");
-        LOG.info("HDFS connection string: " + hdfsConnectionStr);
-       
-		String hdfsPath = configContext.getString("dataSourceConfig.hdfsPath");
-		LOG.info("HDFS path: " + hdfsPath);
-		 
-		String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
-		LOG.info("copyToPath: " + copyToPath);
-		String srcPathStr = new String("hdfs://" + hdfsConnectionStr + hdfsPath);
-		Path srcPath = new Path(srcPathStr); 
-		LOG.info("listFiles called");
-		LOG.info("srcPath: " + srcPath);
-		try {
-			FileSystem fs = srcPath.getFileSystem(conf);
-			/*CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); 
-			CompressionCodec codec = codecFactory.getCodec(srcPath);
-			DataInputStream inputStream = new DataInputStream(codec.createInputStream(fs.open(srcPath)));
-			*/
-			
-			Path destPath = new Path(copyToPath);
-			LOG.info("Destination path: " + destPath);
-			fs.copyToLocalFile(srcPath, destPath);
-			LOG.info("Copy to local succeed");
-			fs.close();
-							
-		} catch (IOException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-		
-	}
-	
-	private List<String> getAllFiles(String root, int level){
-		
-		List<String> lists = new ArrayList<String>();
-		File rootFile = new File(root);
-		File[] tempList = rootFile.listFiles();
-		if(tempList == null)
-			return lists; 
-		
-		for(File temp:tempList){
-			if(temp.isDirectory())
-				lists.addAll(getAllFiles(temp.getAbsolutePath(), ++level));
-			else{
-				if(temp.getName().endsWith(".gz") || temp.getName().endsWith(".csv"))
-					lists.add(temp.getAbsolutePath());
-			}
-		}
-		return lists;
-			
-	}
-	
-	public List<String> listFiles(){
-		
-		String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
-		LOG.info("Reading from: " + copyToPath);
-		List<String> files = new ArrayList<String>();
-		files = getAllFiles(copyToPath, 0); 
-		return files;
-	}
-	
-	@Override
-	public void nextTuple() {
-		LOG.info("Releasing nextTuple");
-		List<String> files = listFiles();
-		LOG.info("Files returned: " + files.size());
-		String typeOfFile = configContext.getString("dataSourceConfig.fileFormat");
-		LOG.info("typeOfFile returned: " + typeOfFile);
-		
-		for(String fileName:files){
-			LOG.info("fileName: " + fileName);
-			LOG.info("processFileMap.get(fileName): " + processFileMap.get(fileName));
-			if(processFileMap.get(fileName) == null || processFileMap.get(fileName) == false){
-				processFileMap.put(fileName, true);
-				BufferedReader br = null; 
-				Reader decoder = null;
-				GZIPInputStream in = null; 
-				InputStream inStream = null;
-				
-				try{
-					if(typeOfFile.equalsIgnoreCase("GZIP")){
-						in = new GZIPInputStream(new FileInputStream(new File(fileName)));
-						decoder = new InputStreamReader(in);
-					}else if(typeOfFile.equalsIgnoreCase("CSV")){
-						inStream = new FileInputStream(new File(fileName)); 
-						decoder = new InputStreamReader(inStream);
-					}else{
-						LOG.error("No known file type specified");
-						continue;
-					}
-					
-					br = new BufferedReader(decoder);
-					int lineNo = 0; 
-					String line = "";
-					while((line = br.readLine())!= null){
-						++lineNo;
-			        	//String line = br.readLine();
-			        	//loggerHDFSSpout.info("line number " + lineNo + "is: " + line);
-			        	//if(line == null || line.equalsIgnoreCase(""))
-			        	//	break;
-			        	LOG.info("Emitting line from file: " + fileName);
-			        	//_collector.emit(new ValuesArray(line), lineNo);
-                        _collector.emit(Arrays.asList((Object)line));
-			        	LOG.info("Emitted line no: " + lineNo + " and line: " + line);
-						Utils.sleep(100);
-					}
-				}
-				catch (Exception e) {
-					// TODO: handle exception
-					e.printStackTrace();
-				}finally{
-					try {
-						if(br != null)
-							br.close();
-						if(decoder != null)
-							decoder.close();
-						if(in != null)
-							in.close();
-						if(inStream != null)
-							inStream.close();
-					} catch (IOException e) {
-						// TODO Auto-generated catch block
-						e.printStackTrace();
-					}
-				}
-			}else{
-				LOG.info("Processed the files before, already done! ");
-				//Utils.sleep(10000);
-			}
-			
-		}
-		
-	}
-	
-	public void fail(Object msgId) {
-	    int transactionId = (Integer) msgId;
-	    LOG.info(transactionId + " failed");
-	}
-	
-	public void ack(Object msgId) {
-	    int transactionId = (Integer) msgId;
-	    LOG.info(transactionId + " acknowledged");
-	}
-
-	@Override
-	public void open(Map arg0, TopologyContext context,
-			SpoutOutputCollector collector) {
-		 _collector = collector;
-		 _context = context;
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		// TODO Auto-generated method stub
-		declarer.declare(new Fields("line"));
-	}
-}