You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/22 07:19:00 UTC

[3/6] incubator-eagle git commit: [EAGLE-787] add healthy check for hadoop-queue/topology-health/spark-history apps

[EAGLE-787] add healthy check for hadoop-queue/topology-health/spark-history apps

Author: wujinhu <wu...@126.com>

Closes #671 from wujinhu/EAGLE-787.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7471eced
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7471eced
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7471eced

Branch: refs/heads/master
Commit: 7471eced641b132df7e2950a93a00914be75cc97
Parents: 620959b
Author: wujinhu <wu...@126.com>
Authored: Tue Nov 22 09:53:04 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Tue Nov 22 15:15:08 2016 +0800

----------------------------------------------------------------------
 .../impl/ApplicationHealthCheckBase.java        |  44 ++++++++
 .../impl/ApplicationHealthCheckServiceImpl.java |  10 +-
 .../eagle/app/spi/ApplicationProvider.java      |   1 +
 .../src/main/resources/HealthCheckTemplate.vm   |   2 +-
 .../queue/HadoopQueueRunningAppProvider.java    |   7 ++
 ...adoopQueueRunningApplicationHealthCheck.java | 100 +++++++++++++++++++
 .../MRHistoryJobApplicationHealthCheck.java     |  41 +++++---
 .../history/SparkHistoryJobAppProvider.java     |   7 ++
 .../SparkHistoryJobApplicationHealthCheck.java  |  99 ++++++++++++++++++
 .../topology/TopologyCheckAppProvider.java      |   7 ++
 .../TopologyCheckApplicationHealthCheck.java    |  98 ++++++++++++++++++
 .../eagle/topology/TopologyConstants.java       |   1 +
 12 files changed, 399 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7471eced/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckBase.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckBase.java
new file mode 100644
index 0000000..104e76d
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckBase.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.service.impl;
+
+import com.codahale.metrics.health.HealthCheck;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
+
+public abstract class ApplicationHealthCheckBase extends HealthCheck {
+    private static final String APP_ID_PATH = "appId";
+    protected static final long DEFAULT_MAX_DELAY_TIME = 2 * 60 * 60 * 1000L;
+    protected static final String MAX_DELAY_TIME_KEY = "application.maxDelayTime";
+
+    protected Config config;
+
+    @Inject
+    private ApplicationEntityService applicationEntityService;
+
+    protected ApplicationHealthCheckBase(Config config) {
+        this.config = config;
+    }
+
+    protected ApplicationEntity.Status getApplicationStatus() {
+        ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(null, config.getString(APP_ID_PATH));
+        return applicationEntity.getStatus();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7471eced/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
index 9d98b2f..4aea963 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.eagle.app.service.impl;
 
 import com.codahale.metrics.health.HealthCheck;
 import com.google.inject.Inject;
+import com.google.inject.Injector;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import io.dropwizard.setup.Environment;
@@ -56,6 +57,9 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer
     private static final String HEALTH_PUBLISHER_IMPL_PATH = "application.healthCheck.publisher.publisherImpl";
 
     @Inject
+    private Injector currentInjector;
+
+    @Inject
     public ApplicationHealthCheckServiceImpl(ApplicationProviderService applicationProviderService,
                                              ApplicationEntityService applicationEntityService,
                                              Config config) {
@@ -105,11 +109,12 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer
         }
         ApplicationProvider<?> appProvider = applicationProviderService.getApplicationProviderByType(appEntity.getDescriptor().getType());
         HealthCheck applicationHealthCheck = appProvider.getAppHealthCheck(
-                ConfigFactory.parseMap(appEntity.getConfiguration())
+                        ConfigFactory.parseMap(appEntity.getContext())
                         .withFallback(config)
-                        .withFallback(ConfigFactory.parseMap(appEntity.getContext()))
+                        .withFallback(ConfigFactory.parseMap(appEntity.getConfiguration()))
         );
         this.environment.healthChecks().register(appEntity.getAppId(), applicationHealthCheck);
+        currentInjector.injectMembers(applicationHealthCheck);
         synchronized (lock) {
             if (!appHealthChecks.containsKey(appEntity.getAppId())) {
                 appHealthChecks.put(appEntity.getAppId(), applicationHealthCheck);
@@ -137,7 +142,6 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer
         registerAll();
         synchronized (lock) {
             for (String appId : appHealthChecks.keySet()) {
-                LOG.info("check application {}", appId);
                 HealthCheck.Result result = appHealthChecks.get(appId).execute();
                 if (result.isHealthy()) {
                     LOG.info("application {} is healthy", appId);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7471eced/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
index 143e026..d9c1eff 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
@@ -23,6 +23,7 @@ import org.apache.eagle.app.Application;
 import org.apache.eagle.app.service.ApplicationListener;
 import org.apache.eagle.common.module.ModuleRegistry;
 import org.apache.eagle.metadata.model.ApplicationDesc;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
 
 import java.lang.reflect.ParameterizedType;
 import java.util.Optional;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7471eced/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm b/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
index b368458..51c1186 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
@@ -23,7 +23,7 @@
 <body>
     #set ( $elem = $unHealthyContext )
 
-<p><b>Message: </b>$elem["appId"] is delayed</p>
+<p><b>Message: </b>$elem["appId"] has delayed</p>
 <p><b>Detail: </b>$elem["unHealthyMessage"]</p>
 
 </body>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7471eced/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
index 916dd5b..5d4078c 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
@@ -16,10 +16,17 @@
  */
 package org.apache.eagle.hadoop.queue;
 
+import com.codahale.metrics.health.HealthCheck;
+import com.typesafe.config.Config;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
 
 public class HadoopQueueRunningAppProvider extends AbstractApplicationProvider<HadoopQueueRunningApp> {
     public HadoopQueueRunningApp getApplication() {
         return new HadoopQueueRunningApp();
     }
+
+    @Override
+    public HealthCheck getAppHealthCheck(Config config) {
+        return new HadoopQueueRunningApplicationHealthCheck(config);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7471eced/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
new file mode 100644
index 0000000..c73021b
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthCheckBase {
+    private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningApplicationHealthCheck.class);
+
+    private HadoopQueueRunningAppConfig hadoopQueueRunningAppConfig;
+
+    public HadoopQueueRunningApplicationHealthCheck(Config config) {
+        super(config);
+        this.hadoopQueueRunningAppConfig = new HadoopQueueRunningAppConfig(config);
+    }
+
+    @Override
+    public Result check() {
+        HadoopQueueRunningAppConfig.EagleProps eagleServiceConfig = this.hadoopQueueRunningAppConfig.eagleProps;
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                eagleServiceConfig.eagleService.host,
+                eagleServiceConfig.eagleService.port,
+                eagleServiceConfig.eagleService.username,
+                eagleServiceConfig.eagleService.password);
+
+        client.getJerseyClient().setReadTimeout(60000);
+
+        try {
+            ApplicationEntity.Status status = getApplicationStatus();
+            if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+                String message = String.format("Application is not running, status is %s", status.toString());
+                return Result.unhealthy(message);
+            }
+
+            String query = String.format("%s[@site=\"%s\"]<@site>{max(timestamp)}",
+                    Constants.GENERIC_METRIC_SERVICE,
+                    hadoopQueueRunningAppConfig.eagleProps.site);
+
+            GenericServiceAPIResponseEntity response = client
+                    .search(query)
+                    .metricName(HadoopClusterConstants.MetricName.HADOOP_CLUSTER_ALLOCATED_MEMORY)
+                    .startTime(System.currentTimeMillis() - 2 * 60 * 60000L)
+                    .endTime(System.currentTimeMillis())
+                    .pageSize(Integer.MAX_VALUE)
+                    .send();
+            List<Map<List<String>, List<Double>>> results = response.getObj();
+            long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+            long currentTimeStamp = System.currentTimeMillis();
+            long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
+            if (hadoopQueueRunningAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
+                maxDelayTime = hadoopQueueRunningAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
+            }
+
+            if (currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
+                String message = String.format("Current process time is %sms, delay %s hours",
+                        currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60);
+                return Result.unhealthy(message);
+            } else {
+                return Result.healthy();
+            }
+        } catch (Exception e) {
+            return Result.unhealthy(ExceptionUtils.getStackTrace(e.getCause()));
+        } finally {
+            client.getJerseyClient().destroy();
+            try {
+                client.close();
+            } catch (Exception e) {
+                LOG.warn("{}", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7471eced/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
index ab078f5..91e16df 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
@@ -17,11 +17,12 @@
 
 package org.apache.eagle.jpm.mr.history;
 
-import com.codahale.metrics.health.HealthCheck;
 import com.typesafe.config.Config;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import org.slf4j.Logger;
@@ -30,28 +31,33 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Map;
 
-public class MRHistoryJobApplicationHealthCheck extends HealthCheck {
+public class MRHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBase {
     private static final Logger LOG = LoggerFactory.getLogger(MRHistoryJobApplicationHealthCheck.class);
 
     private MRHistoryJobConfig mrHistoryJobConfig;
-    private static final long DEFAULT_MAX_DELAY_TIME = 2 * 60 * 60 * 1000L;
-    private static final String MAX_DELAY_TIME_KEY = "application.maxDelayTime";
 
     public MRHistoryJobApplicationHealthCheck(Config config) {
+        super(config);
         mrHistoryJobConfig = MRHistoryJobConfig.newInstance(config);
     }
 
     @Override
     public Result check() {
-        try {
-            MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = mrHistoryJobConfig.getEagleServiceConfig();
-            IEagleServiceClient client = new EagleServiceClientImpl(
-                    eagleServiceConfig.eagleServiceHost,
-                    eagleServiceConfig.eagleServicePort,
-                    eagleServiceConfig.username,
-                    eagleServiceConfig.password);
+        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = mrHistoryJobConfig.getEagleServiceConfig();
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                eagleServiceConfig.eagleServiceHost,
+                eagleServiceConfig.eagleServicePort,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password);
+
+        client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
 
-            client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+        try {
+            ApplicationEntity.Status status = getApplicationStatus();
+            if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+                String message = String.format("Application is not running, status is %s", status.toString());
+                return Result.unhealthy(message);
+            }
 
             String query = String.format("%s[@site=\"%s\"]<@site>{max(currentTimeStamp)}",
                     Constants.JPA_JOB_PROCESS_TIME_STAMP_NAME,
@@ -73,14 +79,21 @@ public class MRHistoryJobApplicationHealthCheck extends HealthCheck {
             }
 
             if (currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
-                String message = String.format("current process time %sms, delay %sms",
-                        currentProcessTimeStamp, currentTimeStamp - currentProcessTimeStamp);
+                String message = String.format("Current process time is %sms, delay %s hours",
+                        currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60);
                 return Result.unhealthy(message);
             } else {
                 return Result.healthy();
             }
         } catch (Exception e) {
             return Result.unhealthy(ExceptionUtils.getStackTrace(e.getCause()));
+        } finally {
+            client.getJerseyClient().destroy();
+            try {
+                client.close();
+            } catch (Exception e) {
+                LOG.warn("{}", e);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7471eced/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
index 343d9c2..2b962c9 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
@@ -17,6 +17,8 @@
 
 package org.apache.eagle.jpm.spark.history;
 
+import com.codahale.metrics.health.HealthCheck;
+import com.typesafe.config.Config;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
 
 public class SparkHistoryJobAppProvider extends AbstractApplicationProvider<SparkHistoryJobApp> {
@@ -24,4 +26,9 @@ public class SparkHistoryJobAppProvider extends AbstractApplicationProvider<Spar
     public SparkHistoryJobApp getApplication() {
         return new SparkHistoryJobApp();
     }
+
+    @Override
+    public HealthCheck getAppHealthCheck(Config config) {
+        return new SparkHistoryJobApplicationHealthCheck(config);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7471eced/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
new file mode 100644
index 0000000..bbca566
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBase {
+    private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobApplicationHealthCheck.class);
+
+    private SparkHistoryJobAppConfig sparkHistoryJobAppConfig;
+
+    public SparkHistoryJobApplicationHealthCheck(Config config) {
+        super(config);
+        this.sparkHistoryJobAppConfig = SparkHistoryJobAppConfig.newInstance(config);
+    }
+
+    @Override
+    public Result check() {
+        SparkHistoryJobAppConfig.EagleInfo eagleServiceConfig = sparkHistoryJobAppConfig.eagleInfo;
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                eagleServiceConfig.host,
+                eagleServiceConfig.port,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password);
+
+        client.getJerseyClient().setReadTimeout(eagleServiceConfig.timeout * 1000);
+
+        try {
+            ApplicationEntity.Status status = getApplicationStatus();
+            if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+                String message = String.format("Application is not running, status is %s", status.toString());
+                return Result.unhealthy(message);
+            }
+
+            String query = String.format("%s[@site=\"%s\"]<@site>{max(endTime)}",
+                    Constants.SPARK_APP_SERVICE_ENDPOINT_NAME,
+                    sparkHistoryJobAppConfig.stormConfig.siteId);
+
+            GenericServiceAPIResponseEntity response = client
+                    .search(query)
+                    .startTime(0L)
+                    .endTime(System.currentTimeMillis())
+                    .pageSize(10)
+                    .send();
+
+            List<Map<List<String>, List<Double>>> results = response.getObj();
+            long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+            long currentTimeStamp = System.currentTimeMillis();
+            long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
+            if (sparkHistoryJobAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
+                maxDelayTime = sparkHistoryJobAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
+            }
+
+            if (currentTimeStamp - currentProcessTimeStamp > maxDelayTime * 3) {
+                String message = String.format("Current process time is %sms, delay %s hours",
+                        currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60);
+                return Result.unhealthy(message);
+            } else {
+                return Result.healthy();
+            }
+        } catch (Exception e) {
+            return Result.unhealthy(ExceptionUtils.getStackTrace(e.getCause()));
+        } finally {
+            client.getJerseyClient().destroy();
+            try {
+                client.close();
+            } catch (Exception e) {
+                LOG.warn("{}", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7471eced/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
index 5969a7a..5766454 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
@@ -18,6 +18,8 @@
 
 package org.apache.eagle.topology;
 
+import com.codahale.metrics.health.HealthCheck;
+import com.typesafe.config.Config;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
 
 public class TopologyCheckAppProvider extends AbstractApplicationProvider<TopologyCheckApp> {
@@ -25,4 +27,9 @@ public class TopologyCheckAppProvider extends AbstractApplicationProvider<Topolo
     public TopologyCheckApp getApplication() {
         return new TopologyCheckApp();
     }
+
+    @Override
+    public HealthCheck getAppHealthCheck(Config config) {
+        return new TopologyCheckApplicationHealthCheck(config);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7471eced/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
new file mode 100644
index 0000000..25730c8
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckBase {
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyCheckApplicationHealthCheck.class);
+
+    private TopologyCheckAppConfig topologyCheckAppConfig;
+
+    public TopologyCheckApplicationHealthCheck(Config config) {
+        super(config);
+        topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config);
+    }
+
+    @Override
+    public Result check() {
+        //FIXME, this application owner please add eagle server config to Class TopologyCheckAppConfig
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                topologyCheckAppConfig.getConfig().getString("service.host"),
+                topologyCheckAppConfig.getConfig().getInt("service.port"),
+                topologyCheckAppConfig.getConfig().getString("service.username"),
+                topologyCheckAppConfig.getConfig().getString("service.password"));
+
+        client.getJerseyClient().setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000);
+
+        try {
+            ApplicationEntity.Status status = getApplicationStatus();
+            if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+                String message = String.format("Application is not running, status is %s", status.toString());
+                return Result.unhealthy(message);
+            }
+
+            String query = String.format("%s[@site=\"%s\"]<@site>{max(timestamp)}",
+                    TopologyConstants.GENERIC_METRIC_SERVICE,
+                    topologyCheckAppConfig.dataExtractorConfig.site);
+
+            GenericServiceAPIResponseEntity response = client
+                    .search(query)
+                    .metricName(String.format(TopologyConstants.METRIC_LIVE_RATIO_NAME_FORMAT, TopologyConstants.REGIONSERVER_ROLE))
+                    .startTime(System.currentTimeMillis() - 2 * 60 * 60000L)
+                    .endTime(System.currentTimeMillis())
+                    .pageSize(Integer.MAX_VALUE)
+                    .send();
+            List<Map<List<String>, List<Double>>> results = response.getObj();
+            long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+            long currentTimeStamp = System.currentTimeMillis();
+            long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
+            if (topologyCheckAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
+                maxDelayTime = topologyCheckAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
+            }
+
+            if (currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
+                String message = String.format("Current process time is %sms, delay %s hours",
+                        currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60);
+                return Result.unhealthy(message);
+            } else {
+                return Result.healthy();
+            }
+        } catch (Exception e) {
+            return Result.unhealthy(ExceptionUtils.getStackTrace(e.getCause()));
+        } finally {
+            client.getJerseyClient().destroy();
+            try {
+                client.close();
+            } catch (Exception e) {
+                LOG.warn("{}", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7471eced/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java
index 74f446b..3e535a8 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java
@@ -26,6 +26,7 @@ public class TopologyConstants {
     public static final String HBASE_INSTANCE_SERVICE_NAME = "HbaseServiceInstance";
     public static final String MR_INSTANCE_SERVICE_NAME = "MRServiceInstance";
     public static final String JN_INSTANCE_SERVICE_NAME = "JNServiceInstance";
+    public static final String GENERIC_METRIC_SERVICE = "GenericMetricService";
 
     public static final int DEFAULT_READ_TIMEOUT = 30 * 60 * 1000; // in milliseconds
     public static final Pattern HTTP_HOST_MATCH_PATTERN = Pattern.compile("^https?://(.+?):-?(\\d+)/?");