You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:47 UTC
[40/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46]
Rename package name as "org.apache.eagle"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/DefaultRunningJobInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/DefaultRunningJobInputStreamCallback.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/DefaultRunningJobInputStreamCallback.java
new file mode 100644
index 0000000..fb77bc7
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/DefaultRunningJobInputStreamCallback.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.callback;
+
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.dataproc.core.EagleOutputCollector;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
+import org.apache.eagle.jobrunning.crawler.JobContext;
+
+public class DefaultRunningJobInputStreamCallback implements RunningJobCallback{
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultRunningJobInputStreamCallback.class);
+
+ private EagleOutputCollector eagleCollector;
+
+ public DefaultRunningJobInputStreamCallback(EagleOutputCollector eagleCollector){
+ this.eagleCollector = eagleCollector;
+ }
+
+ @Override
+ public void onJobRunningInformation(JobContext context, ResourceType type, List<Object> objects) {
+ String jobId = context.jobId;
+ LOG.info(jobId + " information fetched , type: " + type);
+ if (type.equals(ResourceType.JOB_CONFIGURATION)) {
+ @SuppressWarnings("unchecked")
+ Map<String, String> config = (Map<String, String>) objects.get(0);
+ // the fist field is fixed as messageId
+ RunningJobMessageId messageId = new RunningJobMessageId(jobId, type, context.fetchedTime);
+ eagleCollector.collect(new ValuesArray(messageId, context.user, jobId, type, config));
+ }
+ else if (type.equals(ResourceType.JOB_RUNNING_INFO) || type.equals(ResourceType.JOB_COMPLETE_INFO)) {
+ // Here timestamp is meaningless, set to null
+ RunningJobMessageId messageId = new RunningJobMessageId(jobId, type, null);
+ eagleCollector.collect(new ValuesArray(messageId, context.user, jobId, type, objects));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobCallback.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobCallback.java
new file mode 100644
index 0000000..f36e8a5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobCallback.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.callback;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.eagle.jobrunning.crawler.JobContext;
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+/**
+ * callback when running job info is ready
+ */
+public interface RunningJobCallback extends Serializable{
+
+ /**
+ * this is called when running job resource is ready
+ * @param jobContext
+ * @param type
+ * @param objects
+ */
+ void onJobRunningInformation(JobContext jobContext, JobConstants.ResourceType type, List<Object> objects);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobMessageId.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobMessageId.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobMessageId.java
new file mode 100644
index 0000000..08b58f2
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobMessageId.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.callback;
+
+import com.google.common.base.Objects;
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public class RunningJobMessageId {
+ public String jobID;
+ public JobConstants.ResourceType type;
+ // If type = JOB_RUNNING_INFO, timestamp = fetchedTime, otherwise timestamp is meaningless, set to null
+ public Long timestamp;
+
+ public RunningJobMessageId(String jobID, JobConstants.ResourceType type, Long timestamp) {
+ this.jobID = jobID;
+ this.type = type;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final RunningJobMessageId other = (RunningJobMessageId) obj;
+ return Objects.equal(this.jobID, other.jobID)
+ && Objects.equal(this.type, other.type)
+ && Objects.equal(this.timestamp, other.timestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(jobID.hashCode(), type.hashCode(), timestamp.hashCode());
+ }
+
+ @Override
+ public String toString() {
+ return "jobID=" + jobID
+ + ", type=" + type.name()
+ + ", timestamp= " + timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/common/JobConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/common/JobConstants.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/common/JobConstants.java
new file mode 100644
index 0000000..6a1d7c8
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/common/JobConstants.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.common;
+
+public class JobConstants {
+ public static final String APPLICATION_PREFIX = "application";
+ public static final String JOB_PREFIX = "job";
+ public static final String V2_APPS_RUNNING_URL = "ws/v1/cluster/apps?state=RUNNING";
+ public static final String V2_APPS_COMPLETED_URL = "ws/v1/cluster/apps?state=FINISHED";
+ public static final String V2_APPS_URL = "ws/v1/cluster/apps";
+ public static final String ANONYMOUS_PARAMETER = "anonymous=true";
+
+ public static final String V2_PROXY_PREFIX_URL = "proxy/";
+ public static final String V2_APP_DETAIL_URL = "/ws/v1/mapreduce/jobs";
+ public static final String V2_MR_APPMASTER_PREFIX = "/ws/v1/mapreduce/jobs/";
+ public static final String V2_CONF_URL = "/conf";
+ public static final String V2_COMPLETE_APPS_URL = "ws/v1/cluster/apps/";
+ public static final String V2_MR_COUNTERS_URL = "/counters";
+
+
+ public static final String HIVE_QUERY_STRING = "hive.query.string";
+ public static final String JOB_STATE_RUNNING = "RUNNING";
+
+ public enum YarnApplicationType {
+ MAPREDUCE, UNKNOWN
+ }
+
+ public enum CompressionType {
+ GZIP, NONE
+ }
+
+ public enum ResourceType {
+ JOB_RUNNING_INFO, JOB_COMPLETE_INFO, JOB_CONFIGURATION, JOB_LIST
+ }
+
+ public enum JobState {
+ RUNNING, COMPLETED, ALL
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
new file mode 100644
index 0000000..b17a41d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.config;
+
+import java.io.Serializable;
+
+import org.apache.eagle.job.JobPartitioner;
+
+public class RunningJobCrawlConfig implements Serializable{
+ private static final long serialVersionUID = 1L;
+ public RunningJobEndpointConfig endPointConfig;
+ public ControlConfig controlConfig;
+ public ZKStateConfig zkStateConfig;
+
+ public RunningJobCrawlConfig(RunningJobEndpointConfig endPointConfig, ControlConfig controlConfig, ZKStateConfig zkStateConfig){
+ this.endPointConfig = endPointConfig;
+ this.controlConfig = controlConfig;
+ this.zkStateConfig = zkStateConfig;
+ }
+
+ public static class RunningJobEndpointConfig implements Serializable{
+ private static final long serialVersionUID = 1L;
+ public String[] RMBasePaths;
+ public String HSBasePath;
+ }
+
+ public static class ControlConfig implements Serializable{
+ private static final long serialVersionUID = 1L;
+ public boolean jobConfigEnabled;
+ public boolean jobInfoEnabled;
+ public int zkCleanupTimeInday;
+ public int completedJobOutofDateTimeInMin;
+ public int sizeOfJobConfigQueue;
+ public int sizeOfJobCompletedInfoQueue;
+ public Class<? extends JobPartitioner> partitionerCls;
+ public int numTotalPartitions = 1;
+ }
+
+ public static class ZKStateConfig implements Serializable{
+ private static final long serialVersionUID = 1L;
+ public String zkQuorum;
+ public String zkRoot;
+ public int zkSessionTimeoutMs;
+ public int zkRetryTimes;
+ public int zkRetryInterval;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterGroupKey.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterGroupKey.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterGroupKey.java
new file mode 100644
index 0000000..146fbe8
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterGroupKey.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.counter;
+
+import java.util.List;
+
+public interface CounterGroupKey {
+
+ String getName();
+ String getDescription();
+ int getIndex();
+ int getCounterNumber();
+ List<CounterKey> listCounterKeys();
+ CounterKey getCounterKeyByName(String name);
+ CounterKey getCounterKeyByID(int index);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterKey.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterKey.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterKey.java
new file mode 100644
index 0000000..a39d7b4
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterKey.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.counter;
+
+import java.util.List;
+
+public interface CounterKey {
+
+ List<String> getNames();
+ String getDescription();
+ int getIndex();
+ CounterGroupKey getGroupKey();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/JobCounters.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/JobCounters.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/JobCounters.java
new file mode 100644
index 0000000..ba44ac6
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/JobCounters.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.counter;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+
+public final class JobCounters {
+
+ private Map<String, Map<String, Long>> counters = new TreeMap<String, Map<String, Long>>();
+
+ public Map<String, Map<String, Long>> getCounters() {
+ return counters;
+ }
+
+ public void setCounters(Map<String, Map<String, Long>> counters) {
+ this.counters = counters;
+ }
+
+ public String toString(){
+ return counters.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParser.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParser.java
new file mode 100644
index 0000000..d0f3ae9
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParser.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.counter.parser;
+
+import java.util.Map;
+
+import org.jsoup.nodes.Document;
+
+public interface JobCountersParser {
+
+ Map<String, Long> parse(Document doc);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParserImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParserImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParserImpl.java
new file mode 100644
index 0000000..f098c54
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParserImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.counter.parser;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.jsoup.nodes.Document;
+import org.jsoup.nodes.Element;
+import org.jsoup.select.Elements;
+
+public class JobCountersParserImpl implements JobCountersParser {
+
+ @Override
+ public Map<String, Long> parse(Document doc) {
+ Elements elements = doc.select("a[href*=singlejobcounter]");
+ Iterator<Element> iter = elements.iterator();
+ Map<String, Long> counters = new HashMap<String, Long>();
+ while(iter.hasNext()) {
+ Element element = iter.next().parent();
+ String metricName = element.text();
+ long metricValue = Long.parseLong(element.nextElementSibling()
+ .nextElementSibling().nextElementSibling().text()
+ .replace(",", "").trim());
+ counters.put(metricName, metricValue);
+ }
+ return counters;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ConfigWorkTask.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ConfigWorkTask.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ConfigWorkTask.java
new file mode 100644
index 0000000..d9cf79c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ConfigWorkTask.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+import java.util.List;
+
+import org.apache.eagle.jobrunning.util.JobUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.jobrunning.callback.RunningJobCallback;
+import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
+
+public class ConfigWorkTask implements Runnable {
+
+ public JobContext context;
+ public ResourceFetcher fetcher;
+ public RunningJobCallback callback;
+ public RunningJobCrawler crawler;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigWorkTask.class);
+
+ public ConfigWorkTask(JobContext context, ResourceFetcher fetcher, RunningJobCallback callback, RunningJobCrawler crawler) {
+ this.context = context;
+ this.fetcher = fetcher;
+ this.callback = callback;
+ this.crawler = crawler;
+ }
+
+ public void run() {
+ runConfigCrawlerWorkhread(context);
+ }
+
+ private void runConfigCrawlerWorkhread(JobContext context) {
+ LOG.info("Going to fetch job configuration information, jobId:" + context.jobId);
+ try {
+ List<Object> objs = fetcher.getResource(ResourceType.JOB_CONFIGURATION, JobUtils.getAppIDByJobID(context.jobId));
+ callback.onJobRunningInformation(context, ResourceType.JOB_CONFIGURATION, objs);
+ }
+ catch (Exception ex) {
+ if (ex.getMessage().contains("Server returned HTTP response code: 500")) {
+ LOG.warn("The server returns 500 error, it's probably caused by job ACL setting, going to skip this job");
+ // the job remains in processing list, thus we will not do infructuous retry next round
+ // TODO need remove it from processing list when job finished to avoid memory leak
+ }
+ else {
+ LOG.warn("Got an exception when fetching job config: ", ex);
+ crawler.removeFromProcessingList(ResourceType.JOB_CONFIGURATION, context);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/JobContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/JobContext.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/JobContext.java
new file mode 100644
index 0000000..7599457
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/JobContext.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+public class JobContext {
+ public String jobId;
+ public String user;
+ public Long fetchedTime;
+
+ public JobContext() {
+
+ }
+
+ public JobContext(JobContext context) {
+ this.jobId = new String(context.jobId);
+ this.user = new String(context.user);
+ this.fetchedTime = new Long(context.fetchedTime);
+ }
+
+ public JobContext(String jobId, String user, Long fetchedTime) {
+ this.jobId = jobId;
+ this.user = user;
+ this.fetchedTime = fetchedTime;
+ }
+
+ @Override
+ public int hashCode() {
+ return jobId.hashCode() ;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof JobContext) {
+ JobContext context = (JobContext)obj;
+ if (this.jobId.equals(context.jobId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RMResourceFetcher.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RMResourceFetcher.java
new file mode 100644
index 0000000..7c41431
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RMResourceFetcher.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.ZipException;
+
+import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
+import org.apache.eagle.jobrunning.counter.parser.JobCountersParser;
+import org.apache.eagle.jobrunning.counter.parser.JobCountersParserImpl;
+import org.apache.eagle.jobrunning.ha.HAURLSelector;
+import org.apache.eagle.jobrunning.ha.HAURLSelectorImpl;
+import org.apache.eagle.jobrunning.job.conf.JobConfParser;
+import org.apache.eagle.jobrunning.job.conf.JobConfParserImpl;
+import org.apache.eagle.jobrunning.util.InputStreamUtils;
+import org.apache.eagle.jobrunning.util.JobUtils;
+import org.apache.eagle.jobrunning.util.URLConnectionUtils;
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.yarn.model.AppWrapper;
+import org.apache.eagle.jobrunning.yarn.model.JobCompleteWrapper;
+import org.apache.eagle.jobrunning.yarn.model.JobCountersWrapper;
+import org.apache.eagle.jobrunning.yarn.model.JobsWrapper;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.jsoup.Jsoup;
+import org.jsoup.nodes.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.jobrunning.url.JobCompleteCounterServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobCompleteDetailServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobCompletedConfigServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobCountersServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobDetailServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobListServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobRunningConfigServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.JobStatusServiceURLBuilderImpl;
+import org.apache.eagle.jobrunning.url.ServiceURLBuilder;
+import org.apache.eagle.jobrunning.yarn.model.AppInfo;
+import org.apache.eagle.jobrunning.yarn.model.AppsWrapper;
+import org.apache.eagle.jobrunning.yarn.model.JobDetailInfo;
+
+public class RMResourceFetcher implements ResourceFetcher{
+
+ private static final Logger LOG = LoggerFactory.getLogger(RMResourceFetcher.class);
+ private final HAURLSelector selector;
+ private final String historyBaseUrl;
+ private final ServiceURLBuilder jobListServiceURLBuilder;
+ private final ServiceURLBuilder jobDetailServiceURLBuilder;
+ private final ServiceURLBuilder jobCounterServiceURLBuilder;
+ private final ServiceURLBuilder jobRunningConfigServiceURLBuilder;
+ private final ServiceURLBuilder jobCompleteDetailServiceURLBuilder;
+ private final ServiceURLBuilder jobCompleteCounterServiceURLBuilder;
+ private final ServiceURLBuilder jobCompletedConfigServiceURLBuilder;
+ private final ServiceURLBuilder jobStatusServiceURLBuilder;
+
+ private static final int CONNECTION_TIMEOUT = 10000;
+ private static final int READ_TIMEOUT = 10000;
+ private static final String XML_HTTP_HEADER = "Accept";
+ private static final String XML_FORMAT = "application/xml";
+
+ private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+ static {
+ OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ }
+
+ public RMResourceFetcher(RunningJobCrawlConfig.RunningJobEndpointConfig config) {
+ this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl();
+ this.jobDetailServiceURLBuilder = new JobDetailServiceURLBuilderImpl();
+ this.jobCounterServiceURLBuilder = new JobCountersServiceURLBuilderImpl();
+ this.jobRunningConfigServiceURLBuilder = new JobRunningConfigServiceURLBuilderImpl();
+ this.jobCompleteDetailServiceURLBuilder = new JobCompleteDetailServiceURLBuilderImpl();
+ this.jobCompleteCounterServiceURLBuilder = new JobCompleteCounterServiceURLBuilderImpl();
+ this.jobCompletedConfigServiceURLBuilder = new JobCompletedConfigServiceURLBuilderImpl();
+ this.jobStatusServiceURLBuilder = new JobStatusServiceURLBuilderImpl();
+
+ this.selector = new HAURLSelectorImpl(config.RMBasePaths, jobListServiceURLBuilder, JobConstants.CompressionType.GZIP);
+ this.historyBaseUrl = config.HSBasePath;
+ }
+
+ private void checkUrl() throws IOException {
+ if (!selector.checkUrl(jobListServiceURLBuilder.build(selector.getSelectedUrl(), JobConstants.JobState.RUNNING.name()))) {
+ selector.reSelectUrl();
+ }
+ }
+
+ private List<Object> doFetchApplicationsList(String state) throws Exception {
+ List<AppInfo> result = null;
+ InputStream is = null;
+ try {
+ checkUrl();
+ final String urlString = jobListServiceURLBuilder.build(selector.getSelectedUrl(), state);
+ LOG.info("Going to call yarn api to fetch running job list: " + urlString);
+ is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.GZIP);
+ final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
+ if (appWrapper != null && appWrapper.getApps() != null
+ && appWrapper.getApps().getApp() != null) {
+ result = appWrapper.getApps().getApp();
+ return Arrays.asList((Object)result);
+ }
+ return null;
+ }
+ finally {
+ if (is != null) { try {is.close();} catch (Exception e){} }
+ }
+ }
+
+ private List<Object> doFetchRunningJobInfo(String appID) throws Exception{
+ InputStream is = null;
+ InputStream is2 = null;
+ try {
+ final String urlString = jobDetailServiceURLBuilder.build(selector.getSelectedUrl(), appID);
+ LOG.info("Going to fetch job detail information for " + appID + " , url: " + urlString);
+ try {
+ is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.GZIP);
+ }
+ catch (ZipException ex) {
+ // Here if job already completed, it will be redirected to job history page and throw java.util.zip.ZipException
+ LOG.info(appID + " has finished, skip this job");
+ return null;
+ }
+ final JobsWrapper jobWrapper = OBJ_MAPPER.readValue(is, JobsWrapper.class);
+ JobDetailInfo jobDetail = null;
+ if (jobWrapper != null && jobWrapper.getJobs() != null && jobWrapper.getJobs().getJob() != null
+ && jobWrapper.getJobs().getJob().size() > 0) {
+ jobDetail = jobWrapper.getJobs().getJob().get(0);
+ }
+ final String urlString2 = jobCounterServiceURLBuilder.build(selector.getSelectedUrl(), appID);
+ LOG.info("Going to fetch job counters for application " + appID + " , url: " + urlString2);
+ is2 = InputStreamUtils.getInputStream(urlString2, JobConstants.CompressionType.GZIP);
+ final JobCountersWrapper jobCounterWrapper = OBJ_MAPPER.readValue(is2,JobCountersWrapper.class);
+
+ return Arrays.asList(jobDetail, jobCounterWrapper);
+ }
+ finally {
+ if (is != null) { try {is.close();} catch (Exception e){} }
+ if (is2 != null) { try {is2.close();} catch (Exception e){} }
+ }
+ }
+
+ private List<Object> doFetchCompleteJobInfo(String appId) throws Exception{
+ InputStream is = null;
+ InputStream is2 = null;
+ try {
+ checkUrl();
+ String jobID = JobUtils.getJobIDByAppID(appId);
+ String urlString = jobCompleteDetailServiceURLBuilder.build(selector.getSelectedUrl(), jobID);
+ LOG.info("Going to fetch job completed information for " + jobID + " , url: " + urlString);
+ is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.GZIP);
+ final JobCompleteWrapper jobWrapper = OBJ_MAPPER.readValue(is, JobCompleteWrapper.class);
+
+ String urlString2 = jobCompleteCounterServiceURLBuilder.build(historyBaseUrl, jobID);
+ LOG.info("Going to fetch job completed counters for " + jobID + " , url: " + urlString2);
+ is2 = InputStreamUtils.getInputStream(urlString2, JobConstants.CompressionType.NONE, (int) (2 * DateUtils.MILLIS_PER_MINUTE));
+ final Document doc = Jsoup.parse(is2, StandardCharsets.UTF_8.name(), urlString2);
+ JobCountersParser parser = new JobCountersParserImpl();
+ Map<String, Long> counters = parser.parse(doc);
+ return Arrays.asList(jobWrapper, counters);
+ }
+ finally {
+ if (is != null) { try {is.close();} catch (Exception e){} }
+ if (is2 != null) { try {is2.close();} catch (Exception e){} }
+ }
+ }
+
+ private List<Object> doFetchRunningJobConfiguration(String appID) throws Exception {
+ InputStream is = null;
+ try {
+ checkUrl();
+ String jobID = JobUtils.getJobIDByAppID(appID);
+ String urlString = jobRunningConfigServiceURLBuilder.build(selector.getSelectedUrl(), jobID);
+ LOG.info("Going to fetch job completed information for " + jobID + " , url: " + urlString);
+ final URLConnection connection = URLConnectionUtils.getConnection(urlString);
+ connection.setRequestProperty(XML_HTTP_HEADER, XML_FORMAT);
+ connection.setConnectTimeout(CONNECTION_TIMEOUT);
+ connection.setReadTimeout(READ_TIMEOUT);
+ is = connection.getInputStream();
+ Map<String, String> configs = XmlHelper.getConfigs(is);
+ return Arrays.asList((Object)configs);
+ }
+ finally {
+ if (is != null) { try {is.close();} catch (Exception e){} }
+ }
+ }
+
+ private List<Object> doFetchCompletedJobConfiguration(String appID) throws Exception {
+ InputStream is = null;
+ try {
+ String urlString = jobCompletedConfigServiceURLBuilder.build(historyBaseUrl, JobUtils.getJobIDByAppID(appID));
+ is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.NONE);
+ final Document doc = Jsoup.parse(is, "UTF-8", urlString);
+ JobConfParser parser = new JobConfParserImpl();
+ Map<String, String> configs = parser.parse(doc);
+ return Arrays.asList((Object)configs);
+ }
+ finally {
+ if (is != null) { try {is.close();} catch (Exception e){} }
+ }
+ }
+
+ public boolean checkIfJobIsRunning(String appID) throws Exception{
+ InputStream is = null;
+ try {
+ checkUrl();
+ final String urlString = jobStatusServiceURLBuilder.build(selector.getSelectedUrl(), appID);
+ LOG.info("Going to call yarn api to fetch job status: " + urlString);
+ is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.GZIP);
+ final AppWrapper appWrapper = OBJ_MAPPER.readValue(is, AppWrapper.class);
+ if (appWrapper != null && appWrapper.getApp() != null) {
+ AppInfo result = appWrapper.getApp();
+ if (result.getState().equals(JobConstants.JOB_STATE_RUNNING)) {
+ return true;
+ }
+ return false;
+ }
+ else {
+ LOG.error("The status of " + appID + " is not available");
+ throw new IllegalStateException("The status of " + appID + " is not available");
+ }
+ }
+ finally {
+ if (is != null) { try {is.close();} catch (Exception e){} }
+ }
+ }
+
+ public List<Object> getResource(JobConstants.ResourceType resoureType, Object... parameter) throws Exception{
+ switch(resoureType) {
+ case JOB_LIST:
+ return doFetchApplicationsList((String)parameter[0]);
+ case JOB_RUNNING_INFO:
+ //parameter[0]= appId
+ return doFetchRunningJobInfo((String)parameter[0]);
+ case JOB_COMPLETE_INFO:
+ //parameter[0]= appId
+ return doFetchCompleteJobInfo((String)parameter[0]);
+ case JOB_CONFIGURATION:
+ //parameter[0]= appId
+ boolean isRunning = checkIfJobIsRunning((String)parameter[0]);
+ if (isRunning)
+ return doFetchRunningJobConfiguration((String)parameter[0]);
+ else
+ return doFetchCompletedJobConfiguration((String)parameter[0]);
+ default:
+ throw new Exception("Not support ressourceType :" + resoureType);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ResourceFetcher.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ResourceFetcher.java
new file mode 100644
index 0000000..5a5150b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ResourceFetcher.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+import java.util.List;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+public interface ResourceFetcher {
+
+ List<Object> getResource(JobConstants.ResourceType resoureType, Object... parameter) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawler.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawler.java
new file mode 100644
index 0000000..103a2ff
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
+
+public interface RunningJobCrawler {
+
+ public void crawl() throws Exception;
+
+ public void addIntoProcessingList(ResourceType type, JobContext context);
+
+ public void removeFromProcessingList(ResourceType type, JobContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.java
new file mode 100644
index 0000000..72a340a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.eagle.jobrunning.zkres.JobRunningZKStateManager;
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.job.JobFilter;
+import org.apache.eagle.jobrunning.callback.RunningJobCallback;
+import org.apache.eagle.jobrunning.common.JobConstants.JobState;
+import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
+import org.apache.eagle.jobrunning.common.JobConstants.YarnApplicationType;
+import org.apache.eagle.jobrunning.util.JobUtils;
+import org.apache.eagle.jobrunning.yarn.model.AppInfo;
+import org.apache.eagle.common.DateTimeUtil;
+
+public class RunningJobCrawlerImpl implements RunningJobCrawler{
+
+ protected RunningJobCrawlConfig.RunningJobEndpointConfig endpointConfig;
+ protected RunningJobCrawlConfig.ControlConfig controlConfig;
+ protected JobFilter jobFilter;
+ private ResourceFetcher fetcher;
+ private JobRunningZKStateManager zkStateManager;
+ private Thread jobConfigProcessThread;
+ private Thread jobCompleteInfoProcessThread;
+ private Thread jobCompleteStatusCheckerThread;
+ private Thread zkCleanupThread;
+ private final RunningJobCallback callback;
+ private ReadWriteLock readWriteLock;
+ private Map<ResourceType, Map<String, JobContext>> processingJobMap = new ConcurrentHashMap<ResourceType, Map<String, JobContext>>();
+
+ private BlockingQueue<JobContext> queueOfConfig;
+ private BlockingQueue<JobContext> queueOfCompleteJobInfo;
+ private static final int DEFAULT_CONFIG_THREAD_COUNT = 20;
+ private final long DELAY_TO_UPDATE_COMPLETION_JOB_INFO = 5 * DateUtils.MILLIS_PER_MINUTE;
+ private static final Logger LOG = LoggerFactory.getLogger(RunningJobCrawlerImpl.class);
+
+ private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+ static {
+ OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ }
+
+ public RunningJobCrawlerImpl(RunningJobCrawlConfig config, JobRunningZKStateManager zkStateManager,
+ RunningJobCallback callback, JobFilter jobFilter, ReadWriteLock readWriteLock) {
+ this.endpointConfig = config.endPointConfig;
+ this.controlConfig = config.controlConfig;
+ this.callback = callback;
+ this.fetcher = new RMResourceFetcher(endpointConfig);
+ this.jobFilter = jobFilter;
+ this.readWriteLock = readWriteLock;
+ if (config.controlConfig.jobInfoEnabled) {
+ jobCompleteInfoProcessThread = new Thread() {
+ @Override
+ public void run() {
+ startCompleteJobInfoProcessThread();
+ }
+ };
+ jobCompleteInfoProcessThread.setName("JobCompleteInfo-process-thread");
+ jobCompleteInfoProcessThread.setDaemon(true);
+
+ jobCompleteStatusCheckerThread = new Thread() {
+ @Override
+ public void run() {
+ startCompleteStatusCheckerThread();
+ }
+ };
+ jobCompleteStatusCheckerThread.setName("JobComplete-statusChecker-thread");
+ jobCompleteStatusCheckerThread.setDaemon(true);
+ }
+
+ if (config.controlConfig.jobConfigEnabled) {
+ jobConfigProcessThread = new Thread() {
+ @Override
+ public void run() {
+ startJobConfigProcessThread();
+ }
+ };
+ jobConfigProcessThread.setName("JobConfig-process-thread");
+ jobConfigProcessThread.setDaemon(true);
+ }
+
+ zkCleanupThread = new Thread() {
+ @Override
+ public void run() {
+ startzkCleanupThread();
+ }
+ };
+ zkCleanupThread.setName("zk-cleanup-thread");
+ zkCleanupThread.setDaemon(true);
+
+ this.zkStateManager = zkStateManager;
+ this.processingJobMap.put(ResourceType.JOB_CONFIGURATION, new ConcurrentHashMap<String, JobContext>());
+ this.processingJobMap.put(ResourceType.JOB_COMPLETE_INFO, new ConcurrentHashMap<String, JobContext>());
+ this.queueOfConfig = new ArrayBlockingQueue<JobContext>(controlConfig.sizeOfJobConfigQueue);
+ this.queueOfCompleteJobInfo = new ArrayBlockingQueue<JobContext>(controlConfig.sizeOfJobCompletedInfoQueue);
+ }
+
+ private void startJobConfigProcessThread() {
+ int configThreadCount = DEFAULT_CONFIG_THREAD_COUNT;
+ LOG.info("Job Config crawler main thread started, pool size: " + DEFAULT_CONFIG_THREAD_COUNT);
+
+ ThreadFactory factory = new ThreadFactory() {
+ private final AtomicInteger count = new AtomicInteger(0);
+
+ public Thread newThread(Runnable runnable) {
+ count.incrementAndGet();
+ Thread thread = Executors.defaultThreadFactory().newThread(runnable);
+ thread.setName("config-crawler-workthread-" + count.get());
+ return thread;
+ }
+ };
+
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(configThreadCount, configThreadCount, 0L,
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), factory);
+
+ while (true) {
+ JobContext context;
+ try {
+ context = queueOfConfig.take();
+ LOG.info("queueOfConfig size: " + queueOfConfig.size());
+ Runnable configCrawlerThread = new ConfigWorkTask(new JobContext(context), fetcher, callback, this);
+ pool.execute(configCrawlerThread);
+ } catch (InterruptedException e) {
+ LOG.warn("Got an InterruptedException: " + e.getMessage());
+ } catch (RejectedExecutionException e2) {
+ LOG.warn("Got RejectedExecutionException: " + e2.getMessage());
+ }
+ catch (Throwable t) {
+ LOG.warn("Got an throwable t, " + t.getMessage());
+ }
+ }
+ }
+
+ private void startCompleteJobInfoProcessThread() {
+ while(true) {
+ JobContext context = null;
+ try {
+ context = queueOfCompleteJobInfo.take();
+ } catch (InterruptedException ex) {
+ }
+ /** Delay an interval before fetch job complete info, for history url need some time to be accessible,
+ * The default interval is set as 5 min,
+ * Also need to consider if need multi thread to do this
+ */
+ while (System.currentTimeMillis() <
+ context.fetchedTime + DELAY_TO_UPDATE_COMPLETION_JOB_INFO) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e1) {
+ }
+ }
+ try {
+ List<Object> objs = fetcher.getResource(ResourceType.JOB_COMPLETE_INFO, JobUtils.getAppIDByJobID(context.jobId));
+ callback.onJobRunningInformation(context, ResourceType.JOB_COMPLETE_INFO, objs);
+ }
+ catch(Exception ex) {
+ if (ex.getMessage().contains("Server returned HTTP response code: 500")) {
+ LOG.warn("The server returns 500 error, it's probably caused by job ACL setting, going to skip this job");
+ // the job remains in processing list, thus we will not do infructuous retry next round
+ // TODO need remove it from processing list when job finished to avoid memory leak
+ }
+ else LOG.error("Got an exception when fetching resource ", ex);
+ }
+ }
+ }
+
+ public void startCompleteStatusCheckerThread() {
+ while(true) {
+ List<Object> list;
+ try {
+ list = fetcher.getResource(ResourceType.JOB_LIST, JobState.COMPLETED.name());
+ if (list == null) {
+ LOG.warn("Current Completed Job List is Empty");
+ continue;
+ }
+ @SuppressWarnings("unchecked")
+ List<AppInfo> apps = (List<AppInfo>)list.get(0);
+ Set<JobContext> completedJobSet = new HashSet<JobContext>();
+ for (AppInfo app : apps) {
+ //Only fetch MapReduce job
+ if (!YarnApplicationType.MAPREDUCE.name().equals(app.getApplicationType())
+ || !jobFilter.accept(app.getUser())) {
+ continue;
+ }
+ if (System.currentTimeMillis() - app.getFinishedTime() < controlConfig.completedJobOutofDateTimeInMin * DateUtils.MILLIS_PER_MINUTE) {
+ completedJobSet.add(new JobContext(JobUtils.getJobIDByAppID(app.getId()),app.getUser(), System.currentTimeMillis()));
+ }
+ }
+
+ if (controlConfig.jobConfigEnabled) {
+ addIntoProcessingQueueAndList(completedJobSet, queueOfConfig, ResourceType.JOB_CONFIGURATION);
+ }
+
+ if (controlConfig.jobInfoEnabled) {
+ addIntoProcessingQueueAndList(completedJobSet, queueOfCompleteJobInfo, ResourceType.JOB_COMPLETE_INFO);
+ }
+ Thread.sleep(20 * 1000);
+ } catch (Throwable t) {
+ LOG.error("Got a throwable in fetching job completed list :", t);
+ }
+ }
+ }
+
+ public void startzkCleanupThread() {
+ LOG.info("zk cleanup thread started");
+ while(true) {
+ try {
+ long thresholdTime = System.currentTimeMillis() - controlConfig.zkCleanupTimeInday * DateUtils.MILLIS_PER_DAY;
+ String date = DateTimeUtil.format(thresholdTime, "yyyyMMdd");
+ zkStateManager.truncateJobBefore(ResourceType.JOB_CONFIGURATION, date);
+ zkStateManager.truncateJobBefore(ResourceType.JOB_COMPLETE_INFO, date);
+ Thread.sleep(30 * 60 * 1000);
+ }
+ catch (Throwable t) {
+ LOG.warn("Got an throwable, t: ", t);
+ }
+ }
+ }
+
+ public void addIntoProcessingQueueAndList(Set<JobContext> jobSet, BlockingQueue<JobContext> queue, ResourceType type) {
+ try {
+ readWriteLock.writeLock().lock();
+ LOG.info("Write lock acquired");
+ List<String> processingList = zkStateManager.readProcessedJobs(type);
+ processingList.addAll(extractJobList(type));
+ for (JobContext context: jobSet) {
+ String jobId = context.jobId;
+ if (!processingList.contains(jobId)) {
+ addIntoProcessingList(type, context);
+ queue.add(context);
+ }
+ }
+ }
+ finally {
+ try {readWriteLock.writeLock().unlock(); LOG.info("Write lock released");}
+ catch (Throwable t) { LOG.error("Fail to release Write lock", t);}
+ }
+ }
+
+ private List<String> extractJobList(ResourceType type) {
+ Map<String, JobContext> contexts = processingJobMap.get(type);
+ return Arrays.asList(contexts.keySet().toArray(new String[0]));
+ }
+
+ @Override
+ public void crawl() throws Exception {
+ // bring up crawler threads when crawl method is invoked first time
+ if (jobConfigProcessThread != null && !jobConfigProcessThread.isAlive()) {
+ jobConfigProcessThread.start();
+ }
+
+ if (jobCompleteInfoProcessThread != null && !jobCompleteInfoProcessThread.isAlive()) {
+ jobCompleteInfoProcessThread.start();
+ }
+
+ if (jobCompleteStatusCheckerThread != null && !jobCompleteStatusCheckerThread.isAlive()) {
+ jobCompleteStatusCheckerThread.start();
+ }
+
+ if (!zkCleanupThread.isAlive()) {
+ zkCleanupThread.start();
+ }
+
+ List<Object> list = fetcher.getResource(ResourceType.JOB_LIST, JobState.RUNNING.name());
+ if (list == null) {
+ LOG.warn("Current Running Job List is Empty");
+ return;
+ }
+
+ @SuppressWarnings("unchecked")
+ List<AppInfo> apps = (List<AppInfo>)list.get(0);
+ LOG.info("Current Running Job List size : " + apps.size());
+ Set<JobContext> currentRunningJobSet = new HashSet<JobContext>();
+ for (AppInfo app : apps) {
+ //Only fetch MapReduce job
+ if (!YarnApplicationType.MAPREDUCE.name().equals(app.getApplicationType())
+ || !jobFilter.accept(app.getUser())) {
+ continue;
+ }
+ currentRunningJobSet.add(new JobContext(JobUtils.getJobIDByAppID(app.getId()), app.getUser(), System.currentTimeMillis()));
+ }
+
+ if (controlConfig.jobConfigEnabled) {
+ addIntoProcessingQueueAndList(currentRunningJobSet, queueOfConfig, ResourceType.JOB_CONFIGURATION);
+ }
+
+ if (controlConfig.jobInfoEnabled) {
+ // fetch job detail & jobcounters
+ for (JobContext context : currentRunningJobSet) {
+ try {
+ List<Object> objs = fetcher.getResource(ResourceType.JOB_RUNNING_INFO, JobUtils.getAppIDByJobID(context.jobId));
+ callback.onJobRunningInformation(context, ResourceType.JOB_RUNNING_INFO, objs);
+ }
+ catch (Exception ex) {
+ if (ex.getMessage().contains("Server returned HTTP response code: 500")) {
+ LOG.warn("The server returns 500 error, it's probably caused by job ACL setting, going to skip this job");
+ // the job remains in processing list, thus we will not do infructuous retry next round
+ // TODO need remove it from processing list when job finished to avoid memory leak
+ }
+ else LOG.error("Got an exception when fetching resource, jobId: " + context.jobId , ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void addIntoProcessingList(ResourceType type, JobContext context) {
+ processingJobMap.get(type).put(context.jobId, context);
+ }
+
+ @Override
+ public void removeFromProcessingList(ResourceType type, JobContext context) {
+ processingJobMap.get(type).remove(context.jobId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/XmlHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/XmlHelper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/XmlHelper.java
new file mode 100644
index 0000000..9814f5b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/XmlHelper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.crawler;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+public class XmlHelper {
+
+ public static Map<String, String> getConfigs(InputStream is) throws IOException, SAXException, ParserConfigurationException
+ {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db = dbf.newDocumentBuilder();
+
+ Document dt = db.parse(is);
+ Element element = dt.getDocumentElement();
+ Map<String, String> config = new TreeMap<String, String>();
+
+ NodeList propertyList = element.getElementsByTagName("property");
+ int length = propertyList.getLength();
+ for(int i = 0; i < length; i++) {
+ Node property = propertyList.item(i);
+ String key = property.getChildNodes().item(0).getTextContent();
+ String value = property.getChildNodes().item(1).getTextContent();
+ config.put(key, value);
+ }
+ return config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelector.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelector.java
new file mode 100644
index 0000000..2219868
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelector.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.ha;
+
+import java.io.IOException;
+
+/**
+ * @since Aug 21, 2014
+ */
+public interface HAURLSelector {
+
+ boolean checkUrl(String url);
+
+ void reSelectUrl() throws IOException;
+
+ String getSelectedUrl();
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
new file mode 100644
index 0000000..21a81ed
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.ha;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.eagle.jobrunning.url.ServiceURLBuilder;
+import org.apache.eagle.jobrunning.util.InputStreamUtils;
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HAURLSelectorImpl implements HAURLSelector {
+
+ private final String[] urls;
+ private volatile String selectedUrl;
+ private final ServiceURLBuilder builder;
+
+ private volatile boolean reselectInProgress;
+ private final JobConstants.CompressionType compressionType;
+ private static final long MAX_RETRY_TIME = 3;
+ private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class);
+
+ public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, JobConstants.CompressionType compressionType) {
+ this.urls = urls;
+ this.compressionType = compressionType;
+ this.builder = builder;
+ }
+
+ public boolean checkUrl(String urlString) {
+ InputStream is = null;
+ try {
+ is = InputStreamUtils.getInputStream(urlString, compressionType);
+ }
+ catch (Exception ex) {
+ LOG.info("get inputstream from url: " + urlString + " failed. ");
+ return false;
+ }
+ finally {
+ if (is != null) { try { is.close(); } catch (IOException e) {/*Do nothing*/} }
+ }
+ return true;
+ }
+
+ @Override
+ public String getSelectedUrl() {
+ if (selectedUrl == null) {
+ selectedUrl = urls[0];
+ }
+ return selectedUrl;
+ }
+
+ @Override
+ public void reSelectUrl() throws IOException {
+ if (reselectInProgress) return;
+ synchronized(this) {
+ if (reselectInProgress) return;
+ reselectInProgress = true;
+ try {
+ LOG.info("Going to reselect url");
+ for (int i = 0; i < urls.length; i++) {
+ String urlToCheck = urls[i];
+ LOG.info("Going to try url :" + urlToCheck);
+ for (int time = 0; time < MAX_RETRY_TIME; time++) {
+ if (checkUrl(builder.build(urlToCheck, JobConstants.JobState.RUNNING.name()))) {
+ selectedUrl = urls[i];
+ LOG.info("Successfully switch to new url : " + selectedUrl);
+ return;
+ }
+ LOG.info("try url " + urlToCheck + "fail for " + (time+1) + " times, sleep 5 seconds before try again. ");
+ try {
+ Thread.sleep(5 * 1000);
+ }
+ catch (InterruptedException ex) { /* Do Nothing */}
+ }
+ }
+ throw new IOException("No alive url found: "+ StringUtils.join(";", Arrays.asList(this.urls)));
+ }
+ finally {
+ reselectInProgress = false;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParser.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParser.java
new file mode 100644
index 0000000..112b73d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParser.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.job.conf;
+
+import java.util.Map;
+
+import org.jsoup.nodes.Document;
+
+public interface JobConfParser {
+
+ Map<String, String> parse(Document doc);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParserImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParserImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParserImpl.java
new file mode 100644
index 0000000..7a97550
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/job/conf/JobConfParserImpl.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.job.conf;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.jsoup.nodes.Document;
+import org.jsoup.nodes.Element;
+import org.jsoup.select.Elements;
+
+public class JobConfParserImpl implements JobConfParser {
+
+ public Map<String, String> parse(Document doc) {
+ Elements elements = doc.select("table[id=conf]").select("tbody").select("tr");
+ Iterator<Element> iter = elements.iterator();
+ Map<String, String> configs = new HashMap<String, String>();
+ while(iter.hasNext()) {
+ Element element = iter.next();
+ Elements tds = element.children();
+ String key = tds.get(0).text();
+ String value = tds.get(1).text();
+ configs.put(key, value);
+ }
+ return configs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilter.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilter.java
new file mode 100644
index 0000000..3e601e1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.storm;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * define what content in job running stream should be streamed
+ */
+public interface JobRunningContentFilter extends Serializable {
+ boolean acceptJobConf(Map<String, String> config);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilterImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilterImpl.java
new file mode 100644
index 0000000..b00f195
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningContentFilterImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.storm;
+
+import java.util.Map;
+
+import org.apache.eagle.jobrunning.common.JobConstants;
+
+/**
+ * define what content in job running stream should be streamed
+ */
+public class JobRunningContentFilterImpl implements JobRunningContentFilter {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean acceptJobConf(Map<String, String> config) {
+ if (config.containsKey(JobConstants.HIVE_QUERY_STRING)) {
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpout.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpout.java
new file mode 100644
index 0000000..3dd55c1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpout.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.storm;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.eagle.jobrunning.callback.DefaultRunningJobInputStreamCallback;
+import org.apache.eagle.jobrunning.callback.RunningJobMessageId;
+import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
+import org.apache.eagle.jobrunning.crawler.RunningJobCrawler;
+import org.apache.eagle.jobrunning.crawler.RunningJobCrawlerImpl;
+import org.apache.eagle.jobrunning.zkres.JobRunningZKStateManager;
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.crawler.JobContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+
+import org.apache.eagle.job.JobFilter;
+import org.apache.eagle.job.JobFilterByPartition;
+import org.apache.eagle.job.JobPartitioner;
+import org.apache.eagle.jobrunning.callback.RunningJobCallback;
+
+public class JobRunningSpout extends BaseRichSpout {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(JobRunningSpout.class);
+
+ private RunningJobCrawlConfig config;
+ private JobRunningZKStateManager zkStateManager;
+ private transient RunningJobCrawler crawler;
+ private JobRunningSpoutCollectorInterceptor interceptor;
+ private RunningJobCallback callback;
+ private ReadWriteLock readWriteLock;
+ private static final int DEFAULT_WAIT_SECONDS_BETWEEN_ROUNDS = 10;
+
+ public JobRunningSpout(RunningJobCrawlConfig config){
+ this(config, new JobRunningSpoutCollectorInterceptor());
+ }
+
+ /**
+ * mostly this constructor signature is for unit test purpose as you can put customized interceptor here
+ * @param config
+ * @param interceptor
+ */
+ public JobRunningSpout(RunningJobCrawlConfig config, JobRunningSpoutCollectorInterceptor interceptor){
+ this.config = config;
+ this.interceptor = interceptor;
+ this.callback = new DefaultRunningJobInputStreamCallback(interceptor);
+ this.readWriteLock = new ReentrantReadWriteLock();
+ }
+
+
+ /**
+ * TODO: just copy this part from jobHistorySpout, need to move it to a common place
+ * @param context
+ * @return
+ */
+ private int calculatePartitionId(TopologyContext context){
+ int thisGlobalTaskId = context.getThisTaskId();
+ String componentName = context.getComponentId(thisGlobalTaskId);
+ List<Integer> globalTaskIds = context.getComponentTasks(componentName);
+ int index = 0;
+ for(Integer id : globalTaskIds){
+ if(id == thisGlobalTaskId){
+ return index;
+ }
+ index++;
+ }
+ throw new IllegalStateException();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ int partitionId = calculatePartitionId(context);
+ // sanity verify 0<=partitionId<=numTotalPartitions-1
+ if(partitionId < 0 || partitionId > config.controlConfig.numTotalPartitions){
+ throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " +
+ partitionId + " and numTotalPartitions " + config.controlConfig.numTotalPartitions);
+ }
+ Class<? extends JobPartitioner> partitionerCls = config.controlConfig.partitionerCls;
+ JobPartitioner partitioner = null;
+ try {
+ partitioner = partitionerCls.newInstance();
+ } catch (Exception e) {
+ LOG.error("failing instantiating job partitioner class " + partitionerCls.getCanonicalName());
+ throw new IllegalStateException(e);
+ }
+ JobFilter jobFilter = new JobFilterByPartition(partitioner, config.controlConfig.numTotalPartitions, partitionId);
+ interceptor.setSpoutOutputCollector(collector);
+ try {
+ zkStateManager = new JobRunningZKStateManager(config);
+ crawler = new RunningJobCrawlerImpl(config, zkStateManager, callback, jobFilter, readWriteLock);
+ } catch (Exception e) {
+ LOG.error("failing creating crawler driver");
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ try{
+ crawler.crawl();
+ }catch(Exception ex){
+ LOG.error("fail crawling running job and continue ...", ex);
+ }
+ try{
+ Thread.sleep(DEFAULT_WAIT_SECONDS_BETWEEN_ROUNDS *1000);
+ }catch(Exception x){
+ }
+ }
+
+ /**
+ * empty because framework will take care of output fields declaration
+ */
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ /**
+ * add to processedJob
+ */
+ @Override
+ public void ack(Object msgId) {
+ RunningJobMessageId messageId = (RunningJobMessageId) msgId;
+ JobConstants.ResourceType type = messageId.type;
+ LOG.info("Ack on messageId: " + messageId.toString());
+ switch(type) {
+ case JOB_CONFIGURATION:
+ case JOB_COMPLETE_INFO:
+ /** lock this for making processed/processing job list unchanged during crawler calculating last round running job list **/
+ try {
+ readWriteLock.readLock().lock();
+ zkStateManager.addProcessedJob(type, messageId.jobID);
+ // Here username & timestamp is meaningless, set to null
+ crawler.removeFromProcessingList(type, new JobContext(messageId.jobID, null, null));
+ }
+ finally {
+ try {readWriteLock.readLock().unlock(); LOG.info("Read lock released");}
+ catch (Throwable t) { LOG.error("Fail to release Read lock", t);}
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * job is not fully processed
+ */
+ @Override
+ public void fail(Object msgId) {
+ RunningJobMessageId messageId = (RunningJobMessageId) msgId;
+ JobConstants.ResourceType type = messageId.type;
+ // Here timestamp is meaningless, set to null
+ if (type.equals(JobConstants.ResourceType.JOB_COMPLETE_INFO) || type.equals(JobConstants.ResourceType.JOB_CONFIGURATION)) {
+ try {
+ readWriteLock.readLock().lock();
+ // Here username in not used, set to null
+ crawler.removeFromProcessingList(type, new JobContext(messageId.jobID, null, null));
+ }
+ finally {
+ try {readWriteLock.readLock().unlock(); LOG.info("Read lock released");}
+ catch (Throwable t) { LOG.error("Fail to release Read lock", t);}
+ }
+ }
+ }
+
+ @Override
+ public void deactivate() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpoutCollectorInterceptor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpoutCollectorInterceptor.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpoutCollectorInterceptor.java
new file mode 100644
index 0000000..d65ad93
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/storm/JobRunningSpoutCollectorInterceptor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jobrunning.storm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import backtype.storm.spout.SpoutOutputCollector;
+
+import org.apache.eagle.dataproc.core.EagleOutputCollector;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.jobrunning.callback.RunningJobMessageId;
+
+public class JobRunningSpoutCollectorInterceptor implements EagleOutputCollector{
+
+ private static final long serialVersionUID = 1L;
+ private SpoutOutputCollector collector;
+
+ public void setSpoutOutputCollector(SpoutOutputCollector collector){
+ this.collector = collector;
+ }
+
+ @Override
+ public void collect(ValuesArray t) {
+ // the first value is fixed as messageId
+ RunningJobMessageId messageId = (RunningJobMessageId) t.get(0);
+ List<Object> list = new ArrayList<Object>();
+ for (int i = 1; i < t.size(); i++) {
+ list.add(t.get(i));
+ }
+ collector.emit(list, messageId);
+ }
+}