You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:46 UTC
[39/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46]
Rename package name as "org.apache.eagle"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteCounterServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteCounterServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteCounterServiceURLBuilderImpl.java
new file mode 100644
index 0000000..48c7dac
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteCounterServiceURLBuilderImpl.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class JobCompleteCounterServiceURLBuilderImpl implements ServiceURLBuilder {
+
+ public String build(String ... parameters) {
+ // parameters[0] = historyBaseUrl, parameters[1] = jobID
+ // {historyUrl}/jobhistory/jobcounters/job_xxxxxxxxxxxxx_xxxxx?anonymous=true
+ return parameters[0] + "jobhistory/jobcounters/" + parameters[1]
+ + "?" + JobConstants.ANONYMOUS_PARAMETER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteDetailServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteDetailServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteDetailServiceURLBuilderImpl.java
new file mode 100644
index 0000000..9c4e9ba
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompleteDetailServiceURLBuilderImpl.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.util.JobUtils;
+
+public class JobCompleteDetailServiceURLBuilderImpl implements ServiceURLBuilder {
+
+ public String build(String ... parameters) {
+ // parameters[0] = baseUrl, parameters[1] = jobID
+ // {baseUrl}/ws/v1/cluster/apps/job_xxxxxxxxxxxxx_xxxxx?anonymous=true
+ return parameters[0] + JobConstants.V2_COMPLETE_APPS_URL
+ + JobUtils.getAppIDByJobID(parameters[1])
+ + "?" + JobConstants.ANONYMOUS_PARAMETER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompletedConfigServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompletedConfigServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompletedConfigServiceURLBuilderImpl.java
new file mode 100644
index 0000000..077cb0c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCompletedConfigServiceURLBuilderImpl.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+
+public class JobCompletedConfigServiceURLBuilderImpl implements ServiceURLBuilder {
+
+ public String build(String ... parameters) {
+ // parameters[0] = baseUrl, parameters[1] = jobID
+ // {historyUrl}/jobhistory/conf/job_xxxxxxxxxxxxx_xxxxxx
+ return parameters[0] + "jobhistory/conf/" + parameters[1];
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCountersServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCountersServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCountersServiceURLBuilderImpl.java
new file mode 100644
index 0000000..ad2aadd
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobCountersServiceURLBuilderImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.util.JobUtils;
+
+public class JobCountersServiceURLBuilderImpl implements ServiceURLBuilder {
+
+ public String build(String ... parameters) {
+ // parameter[0] = baseUrl, parameter[1] = appID;
+ // {rmUrl}/proxy/application_xxxxxxxxxxxxx_xxxxxx/ws/v1/mapreduce/jobs/job_xxxxxxxxxxxxx_xxxxxx/counters?anonymous=true"
+ return parameters[0] + JobConstants.V2_PROXY_PREFIX_URL +
+ parameters[1] + JobConstants.V2_MR_APPMASTER_PREFIX +
+ JobUtils.getJobIDByAppID(parameters[1]) + JobConstants.V2_MR_COUNTERS_URL +
+ "?" + JobConstants.ANONYMOUS_PARAMETER;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobDetailServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobDetailServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobDetailServiceURLBuilderImpl.java
new file mode 100644
index 0000000..ead9077
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobDetailServiceURLBuilderImpl.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class JobDetailServiceURLBuilderImpl implements ServiceURLBuilder {
+
+ public String build(String... parameters) {
+ // parameter[0] = baseUrl , parameter[1] = appID
+ // {rmUrl}/proxy/application_xxx/ws/v1/mapreduce/jobs?anonymous=true
+ return parameters[0] + JobConstants.V2_PROXY_PREFIX_URL + parameters[1] + JobConstants.V2_APP_DETAIL_URL + "?" + JobConstants.ANONYMOUS_PARAMETER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobListServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobListServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobListServiceURLBuilderImpl.java
new file mode 100644
index 0000000..92cb09a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobListServiceURLBuilderImpl.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class JobListServiceURLBuilderImpl implements ServiceURLBuilder {
+
+ public String build(String ... parameters) {
+ // {rmUrl}/ws/v1/cluster/apps?state=RUNNING
+ String jobState = parameters[1];
+ if (jobState.equals(JobConstants.JobState.RUNNING.name())) {
+ return parameters[0] + JobConstants.V2_APPS_RUNNING_URL + "&" + JobConstants.ANONYMOUS_PARAMETER;
+ }
+ else if (jobState.equals(JobConstants.JobState.COMPLETED.name())) {
+ return parameters[0] + JobConstants.V2_APPS_COMPLETED_URL + "&" + JobConstants.ANONYMOUS_PARAMETER;
+ }
+ else if (jobState.equals(JobConstants.JobState.ALL.name())) {
+ return parameters[0] + JobConstants.V2_APPS_URL + "&" + JobConstants.ANONYMOUS_PARAMETER;
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobRunningConfigServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobRunningConfigServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobRunningConfigServiceURLBuilderImpl.java
new file mode 100644
index 0000000..bc883d4
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobRunningConfigServiceURLBuilderImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.util.JobUtils;
+
+public class JobRunningConfigServiceURLBuilderImpl implements ServiceURLBuilder {
+
+ public String build(String ... parameters) {
+ // parameters[0] = baseUrl, parameters[1] = jobID
+ // {baseUrl}/proxy/application_xxxxxxxxxxxxx_xxxxx/ws/v1/mapreduce/jobs/job_xxxxxxxxxxxxx_xxxxx/conf
+ String urlString = parameters[0] + JobConstants.V2_PROXY_PREFIX_URL
+ + JobUtils.getAppIDByJobID(parameters[1]) + JobConstants.V2_MR_APPMASTER_PREFIX
+ + parameters[1] + JobConstants.V2_CONF_URL
+ + "?" + JobConstants.ANONYMOUS_PARAMETER;
+ return urlString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobStatusServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobStatusServiceURLBuilderImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobStatusServiceURLBuilderImpl.java
new file mode 100644
index 0000000..bfe162c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/JobStatusServiceURLBuilderImpl.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class JobStatusServiceURLBuilderImpl implements ServiceURLBuilder {
+
+ public String build(String ... parameters) {
+ // parameters[0] = rmUrl, parameters[1] = appID
+ // {rmUrl}/ws/v1/cluster/apps/application_xxxxxxxxxxxxx_xxxxx?anonymous=true
+ return parameters[0] + "ws/v1/cluster/apps/" + parameters[1]
+ + "?" + JobConstants.ANONYMOUS_PARAMETER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/ServiceURLBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/ServiceURLBuilder.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/ServiceURLBuilder.java
new file mode 100644
index 0000000..5a50462
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/url/ServiceURLBuilder.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.url;
+
+public interface ServiceURLBuilder {
+ String build(String ... parameters);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java
new file mode 100644
index 0000000..62a15af
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class InputStreamUtils {
+
+ private static final int CONNECTION_TIMEOUT = 10 * 1000;
+ private static final int READ_TIMEOUT = 5 * 60 * 1000;
+ private static final String GZIP_HTTP_HEADER = "Accept-Encoding";
+ private static final String GZIP_COMPRESSION = "gzip";
+
+ private static InputStream openGZIPInputStream(URL url, int timeout) throws IOException {
+ final URLConnection connection = url.openConnection();
+ connection.setConnectTimeout(CONNECTION_TIMEOUT);
+ connection.setReadTimeout(timeout);
+ connection.addRequestProperty(GZIP_HTTP_HEADER, GZIP_COMPRESSION);
+ return new GZIPInputStream(connection.getInputStream());
+ }
+
+ private static InputStream openInputStream(URL url, int timeout) throws IOException {
+ URLConnection connection = url.openConnection();
+ connection.setConnectTimeout(timeout);
+ return connection.getInputStream();
+ }
+
+ public static InputStream getInputStream(String urlString, JobConstants.CompressionType compressionType, int timeout) throws Exception {
+ final URL url = URLConnectionUtils.getUrl(urlString);
+ if (compressionType.equals(JobConstants.CompressionType.GZIP)) {
+ return openGZIPInputStream(url, timeout);
+ }
+ else { // CompressionType.NONE
+ return openInputStream(url, timeout);
+ }
+ }
+
+ public static InputStream getInputStream(String urlString, JobConstants.CompressionType compressionType) throws Exception {
+ return getInputStream(urlString, compressionType, READ_TIMEOUT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/JobUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/JobUtils.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/JobUtils.java
new file mode 100644
index 0000000..9af37ca
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/JobUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.util;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class JobUtils {
+
+ public static String checkAndAddLastSlash(String urlBase) {
+ if (!urlBase.endsWith("/")) {
+ return urlBase + "/";
+ }
+ return urlBase;
+ }
+
+ public static String getJobIDByAppID(String appID) {
+ if (appID.startsWith(JobConstants.APPLICATION_PREFIX)) {
+ return appID.replace(JobConstants.APPLICATION_PREFIX, JobConstants.JOB_PREFIX);
+ }
+ return null;
+ }
+
+ public static String getAppIDByJobID(String jobID) {
+ if (jobID.startsWith(JobConstants.JOB_PREFIX)) {
+ return jobID.replace(JobConstants.JOB_PREFIX, JobConstants.APPLICATION_PREFIX);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/URLConnectionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/URLConnectionUtils.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/URLConnectionUtils.java
new file mode 100644
index 0000000..314fb0a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/URLConnectionUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.util;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class URLConnectionUtils {
+ //TODO: change some public method to private
+ private static final Logger LOG = LoggerFactory.getLogger(URLConnectionUtils.class);
+
+ public static URLConnection getConnection(String url) throws Exception {
+ if (url.startsWith("https://")) {
+ return getHTTPSConnection(url);
+ } else if (url.startsWith("http://")) {
+ return getHTTPConnection(url);
+ }
+ throw new Exception("Invalid input argument url: " + url);
+ }
+
+ public static URLConnection getHTTPConnection(String urlString) throws Exception {
+ final URL url = new URL(urlString);
+ return url.openConnection();
+ }
+
+ public static URL getUrl(String urlString) throws Exception {
+ if(urlString.toLowerCase().contains("https")){
+ return getHTTPSUrl(urlString);
+ }else if (urlString.toLowerCase().contains("http")) {
+ return getURL(urlString);
+ }
+ throw new Exception("Invalid input argument url: " + urlString);
+ }
+
+ public static URL getURL(String urlString) throws MalformedURLException {
+ return new URL(urlString);
+ }
+
+ public static URL getHTTPSUrl(String urlString) throws MalformedURLException, NoSuchAlgorithmException, KeyManagementException {
+ // Create a trust manager that does not validate certificate chains
+ final TrustManager[] trustAllCerts = new TrustManager[] {new TrustAllX509TrustManager()};
+ // Install the all-trusting trust manager
+ final SSLContext sc = SSLContext.getInstance("SSL");
+ sc.init(null, trustAllCerts, new java.security.SecureRandom());
+ HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+ // Create all-trusting host name verifier
+ final HostnameVerifier allHostsValid = new HostnameVerifier() {
+ public boolean verify(String hostname, SSLSession session) {
+ return true;
+ }
+ };
+ HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);
+ return new URL(urlString);
+ }
+
+ public static URLConnection getHTTPSConnection(String urlString) throws IOException, KeyManagementException, NoSuchAlgorithmException {
+ final URL url = getHTTPSUrl(urlString);
+ return url.openConnection();
+ }
+
+ public static class TrustAllX509TrustManager implements X509TrustManager {
+ @Override
+ public void checkClientTrusted(
+ java.security.cert.X509Certificate[] chain, String authType)
+ throws CertificateException {
+ }
+
+ @Override
+ public void checkServerTrusted(
+ java.security.cert.X509Certificate[] chain, String authType)
+ throws CertificateException {
+ }
+
+ @Override
+ public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/App.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/App.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/App.java
new file mode 100644
index 0000000..85a717d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/App.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class App {
+ private String id;
+ private String user;
+ private String name;
+ private String queue;
+ private String state;
+ private String finalStatus;
+ private double progress;
+ private String trackingUI;
+ private String trackingUrl;
+ private String diagnostics;
+ private String clusterId;
+ private String applicationType;
+ private long startedTime;
+ private long finishedTime;
+ private long elapsedTime;
+ private String amContainerLogs;
+ private String amHostHttpAddress;
+
+ public String getId() {
+ return id;
+ }
+ public void setId(String id) {
+ this.id = id;
+ }
+ public String getUser() {
+ return user;
+ }
+ public void setUser(String user) {
+ this.user = user;
+ }
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public String getQueue() {
+ return queue;
+ }
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+ public String getState() {
+ return state;
+ }
+ public void setState(String state) {
+ this.state = state;
+ }
+ public String getFinalStatus() {
+ return finalStatus;
+ }
+ public void setFinalStatus(String finalStatus) {
+ this.finalStatus = finalStatus;
+ }
+ public double getProgress() {
+ return progress;
+ }
+ public void setProgress(double progress) {
+ this.progress = progress;
+ }
+ public String getTrackingUI() {
+ return trackingUI;
+ }
+ public void setTrackingUI(String trackingUI) {
+ this.trackingUI = trackingUI;
+ }
+ public String getTrackingUrl() {
+ return trackingUrl;
+ }
+ public void setTrackingUrl(String trackingUrl) {
+ this.trackingUrl = trackingUrl;
+ }
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+ public void setDiagnostics(String diagnostics) {
+ this.diagnostics = diagnostics;
+ }
+ public String getClusterId() {
+ return clusterId;
+ }
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+ public String getApplicationType() {
+ return applicationType;
+ }
+ public void setApplicationType(String applicationType) {
+ this.applicationType = applicationType;
+ }
+ public long getStartedTime() {
+ return startedTime;
+ }
+ public void setStartedTime(long startedTime) {
+ this.startedTime = startedTime;
+ }
+ public long getFinishedTime() {
+ return finishedTime;
+ }
+ public void setFinishedTime(long finishedTime) {
+ this.finishedTime = finishedTime;
+ }
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+ public void setElapsedTime(long elapsedTime) {
+ this.elapsedTime = elapsedTime;
+ }
+ public String getAmContainerLogs() {
+ return amContainerLogs;
+ }
+ public void setAmContainerLogs(String amContainerLogs) {
+ this.amContainerLogs = amContainerLogs;
+ }
+ public String getAmHostHttpAddress() {
+ return amHostHttpAddress;
+ }
+ public void setAmHostHttpAddress(String amHostHttpAddress) {
+ this.amHostHttpAddress = amHostHttpAddress;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppInfo.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppInfo.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppInfo.java
new file mode 100644
index 0000000..d1a4407
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppInfo.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppInfo {
+ String id;
+ String user;
+ String name;
+ String queue;
+ String state;
+ String finalStatus;
+ double progress;
+ String trackingUI;
+ String trackingUrl;
+ String diagnostics;
+ String clusterId;
+ String applicationType;
+ long startedTime;
+ long finishedTime;
+ long elapsedTime;
+ String amContainerLogs;
+ String amHostHttpAddress;
+
+ public String getId() {
+ return id;
+ }
+ public void setId(String id) {
+ this.id = id;
+ }
+ public String getUser() {
+ return user;
+ }
+ public void setUser(String user) {
+ this.user = user;
+ }
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public String getQueue() {
+ return queue;
+ }
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+ public String getState() {
+ return state;
+ }
+ public void setState(String state) {
+ this.state = state;
+ }
+ public String getFinalStatus() {
+ return finalStatus;
+ }
+ public void setFinalStatus(String finalStatus) {
+ this.finalStatus = finalStatus;
+ }
+ public double getProgress() {
+ return progress;
+ }
+ public void setProgress(double progress) {
+ this.progress = progress;
+ }
+ public String getTrackingUI() {
+ return trackingUI;
+ }
+ public void setTrackingUI(String trackingUI) {
+ this.trackingUI = trackingUI;
+ }
+ public String getTrackingUrl() {
+ return trackingUrl;
+ }
+ public void setTrackingUrl(String trackingUrl) {
+ this.trackingUrl = trackingUrl;
+ }
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+ public void setDiagnostics(String diagnostics) {
+ this.diagnostics = diagnostics;
+ }
+ public String getClusterId() {
+ return clusterId;
+ }
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+ public String getApplicationType() {
+ return applicationType;
+ }
+ public void setApplicationType(String applicationType) {
+ this.applicationType = applicationType;
+ }
+ public long getStartedTime() {
+ return startedTime;
+ }
+ public void setStartedTime(long startedTime) {
+ this.startedTime = startedTime;
+ }
+ public long getFinishedTime() {
+ return finishedTime;
+ }
+ public void setFinishedTime(long finishedTime) {
+ this.finishedTime = finishedTime;
+ }
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+ public void setElapsedTime(long elapsedTime) {
+ this.elapsedTime = elapsedTime;
+ }
+ public String getAmContainerLogs() {
+ return amContainerLogs;
+ }
+ public void setAmContainerLogs(String amContainerLogs) {
+ this.amContainerLogs = amContainerLogs;
+ }
+ public String getAmHostHttpAddress() {
+ return amHostHttpAddress;
+ }
+ public void setAmHostHttpAddress(String amHostHttpAddress) {
+ this.amHostHttpAddress = amHostHttpAddress;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppWrapper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppWrapper.java
new file mode 100644
index 0000000..1f807d1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppWrapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppWrapper {
+
+ private AppInfo app;
+
+ public AppInfo getApp() {
+ return app;
+ }
+
+ public void setApp(AppInfo app) {
+ this.app = app;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Applications.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Applications.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Applications.java
new file mode 100644
index 0000000..b76ee6b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Applications.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import java.util.List;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Applications {
+
+ private List<AppInfo> app;
+
+ public List<AppInfo> getApp() {
+ return app;
+ }
+
+ public void setApp(List<AppInfo> app) {
+ this.app = app;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppsWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppsWrapper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppsWrapper.java
new file mode 100644
index 0000000..79abd29
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/AppsWrapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppsWrapper {
+
+ private Applications apps;
+
+ public Applications getApps() {
+ return apps;
+ }
+
+ public void setApps(Applications apps) {
+ this.apps = apps;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Counter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Counter.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Counter.java
new file mode 100644
index 0000000..afc487e
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Counter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+public class Counter {
+
+ private String name;
+ private long totalCounterValue;
+ private long mapCounterValue;
+ private long reduceCounterValue;
+
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public long getTotalCounterValue() {
+ return totalCounterValue;
+ }
+ public void setTotalCounterValue(long totalCounterValue) {
+ this.totalCounterValue = totalCounterValue;
+ }
+ public long getMapCounterValue() {
+ return mapCounterValue;
+ }
+ public void setMapCounterValue(long mapCounterValue) {
+ this.mapCounterValue = mapCounterValue;
+ }
+ public long getReduceCounterValue() {
+ return reduceCounterValue;
+ }
+ public void setReduceCounterValue(long reduceCounterValue) {
+ this.reduceCounterValue = reduceCounterValue;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/CounterGroup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/CounterGroup.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/CounterGroup.java
new file mode 100644
index 0000000..22e498c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/CounterGroup.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import java.util.List;
+
+public class CounterGroup {
+
+ private String counterGroupName;
+ private List<Counter> counter;
+
+ public String getCounterGroupName() {
+ return counterGroupName;
+ }
+ public void setCounterGroupName(String counterGroupName) {
+ this.counterGroupName = counterGroupName;
+ }
+ public List<Counter> getCounter() {
+ return counter;
+ }
+ public void setCounter(List<Counter> counter) {
+ this.counter = counter;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCompleteWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCompleteWrapper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCompleteWrapper.java
new file mode 100644
index 0000000..124dc65
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCompleteWrapper.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JobCompleteWrapper {
+ private App app;
+
+ public App getApp() {
+ return app;
+ }
+
+ public void setApp(App app) {
+ this.app = app;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCounters.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCounters.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCounters.java
new file mode 100644
index 0000000..76b722c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCounters.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import java.util.List;
+
+public class JobCounters {
+
+ private String id;
+ private List<CounterGroup> counterGroup;
+
+ public String getId() {
+ return id;
+ }
+ public void setId(String id) {
+ this.id = id;
+ }
+ public List<CounterGroup> getCounterGroup() {
+ return counterGroup;
+ }
+ public void setCounterGroup(List<CounterGroup> counterGroup) {
+ this.counterGroup = counterGroup;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCountersWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCountersWrapper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCountersWrapper.java
new file mode 100644
index 0000000..f3bb136
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobCountersWrapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+
+public class JobCountersWrapper {
+
+ private JobCounters jobCounters;
+
+ public JobCounters getJobCounters() {
+ return jobCounters;
+ }
+
+ public void setJobCounters(JobCounters jobCounters) {
+ this.jobCounters = jobCounters;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobDetailInfo.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobDetailInfo.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobDetailInfo.java
new file mode 100644
index 0000000..cf2fa1f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobDetailInfo.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.apache.eagle.jobrunning.counter.JobCounters;
+
+public class JobDetailInfo {
+
+ private long startTime;
+ private long finishTime;
+ private long elapsedTime;
+ private String id;
+ private String name;
+ private String user;
+ private String state;
+ private int mapsTotal;
+ private int mapsCompleted;
+ private int reducesTotal;
+ private int reducesCompleted;
+ private double mapProgress;
+ private double reduceProgress;
+ private int mapsPending;
+ private int mapsRunning;
+ private int reducesPending;
+ private int reducesRunning;
+ private boolean uberized;
+ private String diagnostics;
+ private int newReduceAttempts;
+ private int runningReduceAttempts;
+ private int failedReduceAttempts;
+ private int killedReduceAttempts;
+ private int successfulReduceAttempts;
+ private int newMapAttempts;
+ private int runningMapAttempts;
+ private int failedMapAttempts;
+ private int killedMapAttempts;
+ private int successfulMapAttempts;
+ private String queue;
+ private org.apache.eagle.jobrunning.counter.JobCounters jobcounter;
+
+ public long getStartTime() {
+ return startTime;
+ }
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+ public long getFinishTime() {
+ return finishTime;
+ }
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+ public void setElapsedTime(long elapsedTime) {
+ this.elapsedTime = elapsedTime;
+ }
+ public String getId() {
+ return id;
+ }
+ public void setId(String id) {
+ this.id = id;
+ }
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public String getUser() {
+ return user;
+ }
+ public void setUser(String user) {
+ this.user = user;
+ }
+ public String getState() {
+ return state;
+ }
+ public void setState(String state) {
+ this.state = state;
+ }
+ public int getMapsTotal() {
+ return mapsTotal;
+ }
+ public void setMapsTotal(int mapsTotal) {
+ this.mapsTotal = mapsTotal;
+ }
+ public int getMapsCompleted() {
+ return mapsCompleted;
+ }
+ public void setMapsCompleted(int mapsCompleted) {
+ this.mapsCompleted = mapsCompleted;
+ }
+ public int getReducesTotal() {
+ return reducesTotal;
+ }
+ public void setReducesTotal(int reducesTotal) {
+ this.reducesTotal = reducesTotal;
+ }
+ public int getReducesCompleted() {
+ return reducesCompleted;
+ }
+ public void setReducesCompleted(int reducesCompleted) {
+ this.reducesCompleted = reducesCompleted;
+ }
+ public double getMapProgress() {
+ return mapProgress;
+ }
+ public void setMapProgress(double mapProgress) {
+ this.mapProgress = mapProgress;
+ }
+ public double getReduceProgress() {
+ return reduceProgress;
+ }
+ public void setReduceProgress(double reduceProgress) {
+ this.reduceProgress = reduceProgress;
+ }
+ public int getMapsPending() {
+ return mapsPending;
+ }
+ public void setMapsPending(int mapsPending) {
+ this.mapsPending = mapsPending;
+ }
+ public int getMapsRunning() {
+ return mapsRunning;
+ }
+ public void setMapsRunning(int mapsRunning) {
+ this.mapsRunning = mapsRunning;
+ }
+ public int getReducesPending() {
+ return reducesPending;
+ }
+ public void setReducesPending(int reducesPending) {
+ this.reducesPending = reducesPending;
+ }
+ public int getReducesRunning() {
+ return reducesRunning;
+ }
+ public void setReducesRunning(int reducesRunning) {
+ this.reducesRunning = reducesRunning;
+ }
+ public boolean isUberized() {
+ return uberized;
+ }
+ public void setUberized(boolean uberized) {
+ this.uberized = uberized;
+ }
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+ public void setDiagnostics(String diagnostics) {
+ this.diagnostics = diagnostics;
+ }
+ public int getNewReduceAttempts() {
+ return newReduceAttempts;
+ }
+ public void setNewReduceAttempts(int newReduceAttempts) {
+ this.newReduceAttempts = newReduceAttempts;
+ }
+ public int getRunningReduceAttempts() {
+ return runningReduceAttempts;
+ }
+ public void setRunningReduceAttempts(int runningReduceAttempts) {
+ this.runningReduceAttempts = runningReduceAttempts;
+ }
+ public int getFailedReduceAttempts() {
+ return failedReduceAttempts;
+ }
+ public void setFailedReduceAttempts(int failedReduceAttempts) {
+ this.failedReduceAttempts = failedReduceAttempts;
+ }
+ public int getKilledReduceAttempts() {
+ return killedReduceAttempts;
+ }
+ public void setKilledReduceAttempts(int killedReduceAttempts) {
+ this.killedReduceAttempts = killedReduceAttempts;
+ }
+ public int getSuccessfulReduceAttempts() {
+ return successfulReduceAttempts;
+ }
+ public void setSuccessfulReduceAttempts(int successfulReduceAttempts) {
+ this.successfulReduceAttempts = successfulReduceAttempts;
+ }
+ public int getNewMapAttempts() {
+ return newMapAttempts;
+ }
+ public void setNewMapAttempts(int newMapAttempts) {
+ this.newMapAttempts = newMapAttempts;
+ }
+ public int getRunningMapAttempts() {
+ return runningMapAttempts;
+ }
+ public void setRunningMapAttempts(int runningMapAttempts) {
+ this.runningMapAttempts = runningMapAttempts;
+ }
+ public int getFailedMapAttempts() {
+ return failedMapAttempts;
+ }
+ public void setFailedMapAttempts(int failedMapAttempts) {
+ this.failedMapAttempts = failedMapAttempts;
+ }
+ public int getKilledMapAttempts() {
+ return killedMapAttempts;
+ }
+ public void setKilledMapAttempts(int killedMapAttempts) {
+ this.killedMapAttempts = killedMapAttempts;
+ }
+ public int getSuccessfulMapAttempts() {
+ return successfulMapAttempts;
+ }
+ public void setSuccessfulMapAttempts(int successfulMapAttempts) {
+ this.successfulMapAttempts = successfulMapAttempts;
+ }
+ public String getQueue() {
+ return queue;
+ }
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+ public JobCounters getJobcounter() {
+ return jobcounter;
+ }
+ public void setJobcounter(JobCounters jobcounter) {
+ this.jobcounter = jobcounter;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Jobs.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Jobs.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Jobs.java
new file mode 100644
index 0000000..a9637f7
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/Jobs.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import java.util.List;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Jobs {
+
+ private List<JobDetailInfo> job;
+
+ public List<JobDetailInfo> getJob() {
+ return job;
+ }
+
+ public void setJob(List<JobDetailInfo> job) {
+ this.job = job;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobsWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobsWrapper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobsWrapper.java
new file mode 100644
index 0000000..c139ccf
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/yarn/model/JobsWrapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.yarn.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JobsWrapper {
+
+ private Jobs jobs;
+
+ public Jobs getJobs() {
+ return jobs;
+ }
+
+ public void setJobs(Jobs jobs) {
+ this.jobs = jobs;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateLCM.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateLCM.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateLCM.java
new file mode 100644
index 0000000..6af2a19
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateLCM.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.zkres;
+
+import java.util.List;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public interface JobRunningZKStateLCM {
+
+ List<String> readProcessedJobs(JobConstants.ResourceType type);
+
+ void addProcessedJob(JobConstants.ResourceType type, String jobID);
+
+ // date format e.g. "20150901"
+ void truncateJobBefore(JobConstants.ResourceType type, String date);
+
+ void truncateProcessedJob(JobConstants.ResourceType type, String jobID);
+
+ void truncateEverything();
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateManager.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateManager.java
new file mode 100644
index 0000000..41da60a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateManager.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.zkres;
+
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.common.DateTimeUtil;
+
+public class JobRunningZKStateManager implements JobRunningZKStateLCM{
+ public static final Logger LOG = LoggerFactory.getLogger(JobRunningZKStateManager.class);
+ private String zkRoot;
+ private CuratorFramework _curator;
+
+ public static final String DATE_FORMAT_PATTERN = "yyyyMMdd";
+
+ private CuratorFramework newCurator(RunningJobCrawlConfig config) throws Exception {
+ return CuratorFrameworkFactory.newClient(
+ config.zkStateConfig.zkQuorum,
+ config.zkStateConfig.zkSessionTimeoutMs,
+ 15000,
+ new RetryNTimes(config.zkStateConfig.zkRetryTimes, config.zkStateConfig.zkRetryInterval)
+ );
+ }
+
+ public JobRunningZKStateManager(RunningJobCrawlConfig config) {
+ this.zkRoot = config.zkStateConfig.zkRoot;
+ try {
+ _curator = newCurator(config);
+ _curator.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void close() {
+ _curator.close();
+ _curator = null;
+ }
+
+ public long getTimestampFromDate(String dateStr) throws ParseException {
+ SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_PATTERN);
+ sdf.setTimeZone(EagleConfigFactory.load().getTimeZone());
+ Date d = sdf.parse(dateStr);
+ return d.getTime();
+ }
+
+ @Override
+ public List<String> readProcessedJobs(JobConstants.ResourceType type) {
+ String path = zkRoot + "/" + type.name() + "/jobs";
+ InterProcessMutex lock = new InterProcessMutex(_curator, path);
+ try {
+ lock.acquire();
+ if (_curator.checkExists().forPath(path) != null) {
+ LOG.info("Got processed job list from zk, type: " + type.name());
+ return _curator.getChildren().forPath(path);
+ } else {
+ LOG.info("Currently processed job list is empty, type: " + type.name());
+ return new ArrayList<String>();
+ }
+ } catch (Exception e) {
+ LOG.error("fail read processed jobs", e);
+ throw new RuntimeException(e);
+ }
+ finally {
+ try{
+ lock.release();
+ }catch(Exception e){
+ LOG.error("fail releasing lock", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void addProcessedJob(JobConstants.ResourceType type, String jobID) {
+ String path = zkRoot + "/" + type.name() + "/jobs/" + jobID;
+ try {
+ //we record date for cleanup, e.g. cleanup job's znodes whose created date < 20150801
+ String date = DateTimeUtil.format(System.currentTimeMillis(), DATE_FORMAT_PATTERN);
+ LOG.info("add processed job, jobID: " + jobID + ", type: " + type + ", date: " + date);
+ if (_curator.checkExists().forPath(path) == null) {
+ _curator.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path, date.getBytes(StandardCharsets.UTF_8));
+ }
+ else {
+ LOG.warn("Job already exist in zk, skip the job: " + jobID + " , type: " + type);
+ }
+ } catch (Exception e) {
+ LOG.error("fail adding processed jobs", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void truncateJobBefore(JobConstants.ResourceType type, String date) {
+ String path = zkRoot + "/" + type.name() + "/jobs";
+ InterProcessMutex lock = new InterProcessMutex(_curator, path);
+ try {
+ lock.acquire();
+ long thresholdTime = getTimestampFromDate(date);
+ if (_curator.checkExists().forPath(path) != null) {
+ LOG.info("Going to delete processed job before " + date + ", type: " + type);
+ List<String> jobIDList = _curator.getChildren().forPath(path);
+ for(String jobID : jobIDList) {
+ if (!jobID.startsWith("job_")) continue; // skip lock node
+ String jobPath = path + "/" + jobID;
+ long createTime = getTimestampFromDate(new String(_curator.getData().forPath(jobPath), StandardCharsets.UTF_8));
+ if (createTime < thresholdTime) {
+ LOG.info("Going to truncate job: " + jobPath);
+ _curator.delete().deletingChildrenIfNeeded().forPath(jobPath);
+ }
+ }
+ }
+ else {
+ LOG.info("Currently processed job list is empty, type: " + type.name());
+ }
+ } catch (Exception e) {
+ LOG.error("fail deleting processed jobs", e);
+ throw new RuntimeException(e);
+ }
+ finally {
+ try{
+ lock.release();
+ }catch(Exception e){
+ LOG.error("fail releasing lock", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void truncateProcessedJob(JobConstants.ResourceType type, String jobID) {
+ LOG.info("trying to truncate all data for job " + jobID);
+ String path = zkRoot + "/" + type.name() + "/jobs/" + jobID;
+ InterProcessMutex lock = new InterProcessMutex(_curator, path);
+ try {
+ lock.acquire();
+ if (_curator.checkExists().forPath(path) != null) {
+ _curator.delete().deletingChildrenIfNeeded().forPath(path);
+ LOG.info("really truncated all data for jobID: " + jobID);
+ }
+ } catch (Exception e) {
+ LOG.error("fail truncating processed jobs", e);
+ throw new RuntimeException(e);
+ }
+ finally {
+ try{
+ lock.release();
+ }catch(Exception e){
+ LOG.error("fail releasing lock", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void truncateEverything() {
+ String path = zkRoot;
+ InterProcessMutex lock = new InterProcessMutex(_curator, path);
+ try{
+ lock.acquire();
+ if(_curator.checkExists().forPath(path) != null){
+ _curator.delete().deletingChildrenIfNeeded().forPath(path);
+ }
+ }catch(Exception ex){
+ LOG.error("fail truncating verything", ex);
+ throw new RuntimeException(ex);
+ }
+ finally {
+ try{
+ lock.release();
+ }catch(Exception e){
+ LOG.error("fail releasing lock", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf
index ee12356..9a44f89 100644
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf
@@ -16,7 +16,7 @@
"zkRetryTimes" : 3,
"zkRetryInterval" : 2000,
"RMEndPoints" : "http://localhost:8088/",
- "partitionerCls" : "eagle.job.DefaultJobPartitionerImpl"
+ "partitionerCls" : "org.apache.eagle.job.DefaultJobPartitionerImpl"
},
"eagleProps" : {
"site" : "sandbox",
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java
deleted file mode 100644
index a0297c6..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.impl.storm;
-
-import backtype.storm.topology.base.BaseRichSpout;
-
-import com.typesafe.config.Config;
-
-/**
- * Normally storm spout is a special part of storm topology and it is implemented in underlying spout implementation
- * which can be retrieved from getSpout method.
- */
-public abstract class AbstractStormSpoutProvider{
- public abstract BaseRichSpout getSpout(Config context);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java
deleted file mode 100644
index 7811370..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.impl.storm.hdfs;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.*;
-import java.util.zip.GZIPInputStream;
-
-import com.typesafe.config.Config;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import eagle.dataproc.impl.storm.hdfs.HDFSSourcedStormSpoutProvider.HDFSSpout;
-
-public class DataCollectionHDFSSpout extends HDFSSpout{
-
- private static final long serialVersionUID = 8775646842131298552L;
- private Config configContext;
- private TopologyContext _context;
- SpoutOutputCollector _collector;
- private Map<String, Boolean> processFileMap = null;
- private static final Logger LOG = LoggerFactory.getLogger(DataCollectionHDFSSpout.class);
-
- public DataCollectionHDFSSpout(Config configContext){
- this.configContext = configContext;
- processFileMap = new HashMap<String, Boolean>();
- LOG.info("DataCollectionHDFSSpout called");
-
- }
-
- public void copyFiles(){
- LOG.info("Inside listFiles()");
- Configuration conf = new Configuration();
- // _____________ TO TEST THAT CORRECT HADOOP JARs ARE INCLUDED __________________
- ClassLoader cl = ClassLoader.getSystemClassLoader();
- URL[] urls = ((URLClassLoader)cl).getURLs();
- if(LOG.isDebugEnabled()) {
- for (URL url : urls) {
- LOG.debug(url.getFile());
- }
- }
- // _________________________________________
- String hdfsConnectionStr = configContext.getString("dataSourceConfig.hdfsConnnection");
- LOG.info("HDFS connection string: " + hdfsConnectionStr);
-
- String hdfsPath = configContext.getString("dataSourceConfig.hdfsPath");
- LOG.info("HDFS path: " + hdfsPath);
-
- String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
- LOG.info("copyToPath: " + copyToPath);
- String srcPathStr = new String("hdfs://" + hdfsConnectionStr + hdfsPath);
- Path srcPath = new Path(srcPathStr);
- LOG.info("listFiles called");
- LOG.info("srcPath: " + srcPath);
- try {
- FileSystem fs = srcPath.getFileSystem(conf);
- /*CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
- CompressionCodec codec = codecFactory.getCodec(srcPath);
- DataInputStream inputStream = new DataInputStream(codec.createInputStream(fs.open(srcPath)));
- */
-
- Path destPath = new Path(copyToPath);
- LOG.info("Destination path: " + destPath);
- fs.copyToLocalFile(srcPath, destPath);
- LOG.info("Copy to local succeed");
- fs.close();
-
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
-
- private List<String> getAllFiles(String root, int level){
-
- List<String> lists = new ArrayList<String>();
- File rootFile = new File(root);
- File[] tempList = rootFile.listFiles();
- if(tempList == null)
- return lists;
-
- for(File temp:tempList){
- if(temp.isDirectory())
- lists.addAll(getAllFiles(temp.getAbsolutePath(), ++level));
- else{
- if(temp.getName().endsWith(".gz") || temp.getName().endsWith(".csv"))
- lists.add(temp.getAbsolutePath());
- }
- }
- return lists;
-
- }
-
- public List<String> listFiles(){
-
- String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
- LOG.info("Reading from: " + copyToPath);
- List<String> files = new ArrayList<String>();
- files = getAllFiles(copyToPath, 0);
- return files;
- }
-
- @Override
- public void nextTuple() {
- LOG.info("Releasing nextTuple");
- List<String> files = listFiles();
- LOG.info("Files returned: " + files.size());
- String typeOfFile = configContext.getString("dataSourceConfig.fileFormat");
- LOG.info("typeOfFile returned: " + typeOfFile);
-
- for(String fileName:files){
- LOG.info("fileName: " + fileName);
- LOG.info("processFileMap.get(fileName): " + processFileMap.get(fileName));
- if(processFileMap.get(fileName) == null || processFileMap.get(fileName) == false){
- processFileMap.put(fileName, true);
- BufferedReader br = null;
- Reader decoder = null;
- GZIPInputStream in = null;
- InputStream inStream = null;
-
- try{
- if(typeOfFile.equalsIgnoreCase("GZIP")){
- in = new GZIPInputStream(new FileInputStream(new File(fileName)));
- decoder = new InputStreamReader(in);
- }else if(typeOfFile.equalsIgnoreCase("CSV")){
- inStream = new FileInputStream(new File(fileName));
- decoder = new InputStreamReader(inStream);
- }else{
- LOG.error("No known file type specified");
- continue;
- }
-
- br = new BufferedReader(decoder);
- int lineNo = 0;
- String line = "";
- while((line = br.readLine())!= null){
- ++lineNo;
- //String line = br.readLine();
- //loggerHDFSSpout.info("line number " + lineNo + "is: " + line);
- //if(line == null || line.equalsIgnoreCase(""))
- // break;
- LOG.info("Emitting line from file: " + fileName);
- //_collector.emit(new ValuesArray(line), lineNo);
- _collector.emit(Arrays.asList((Object)line));
- LOG.info("Emitted line no: " + lineNo + " and line: " + line);
- Utils.sleep(100);
- }
- }
- catch (Exception e) {
- // TODO: handle exception
- e.printStackTrace();
- }finally{
- try {
- if(br != null)
- br.close();
- if(decoder != null)
- decoder.close();
- if(in != null)
- in.close();
- if(inStream != null)
- inStream.close();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }else{
- LOG.info("Processed the files before, already done! ");
- //Utils.sleep(10000);
- }
-
- }
-
- }
-
- public void fail(Object msgId) {
- int transactionId = (Integer) msgId;
- LOG.info(transactionId + " failed");
- }
-
- public void ack(Object msgId) {
- int transactionId = (Integer) msgId;
- LOG.info(transactionId + " acknowledged");
- }
-
- @Override
- public void open(Map arg0, TopologyContext context,
- SpoutOutputCollector collector) {
- _collector = collector;
- _context = context;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // TODO Auto-generated method stub
- declarer.declare(new Fields("line"));
- }
-}