You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/11/22 01:53:09 UTC
incubator-eagle git commit: [EAGLE-787] add healthy check for
hadoop-queue/topology-health/spark-history apps
Repository: incubator-eagle
Updated Branches:
refs/heads/master 57da45a8d -> 14d55dd51
[EAGLE-787] add healthy check for hadoop-queue/topology-health/spark-history apps
Author: wujinhu <wu...@126.com>
Closes #671 from wujinhu/EAGLE-787.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/14d55dd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/14d55dd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/14d55dd5
Branch: refs/heads/master
Commit: 14d55dd51eaf3ccd51d28f23ad151ccd916d70aa
Parents: 57da45a
Author: wujinhu <wu...@126.com>
Authored: Tue Nov 22 09:53:04 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Nov 22 09:53:04 2016 +0800
----------------------------------------------------------------------
.../impl/ApplicationHealthCheckBase.java | 44 ++++++++
.../impl/ApplicationHealthCheckServiceImpl.java | 10 +-
.../eagle/app/spi/ApplicationProvider.java | 1 +
.../src/main/resources/HealthCheckTemplate.vm | 2 +-
.../queue/HadoopQueueRunningAppProvider.java | 7 ++
...adoopQueueRunningApplicationHealthCheck.java | 100 +++++++++++++++++++
.../MRHistoryJobApplicationHealthCheck.java | 41 +++++---
.../history/SparkHistoryJobAppProvider.java | 7 ++
.../SparkHistoryJobApplicationHealthCheck.java | 99 ++++++++++++++++++
.../topology/TopologyCheckAppProvider.java | 7 ++
.../TopologyCheckApplicationHealthCheck.java | 98 ++++++++++++++++++
.../eagle/topology/TopologyConstants.java | 1 +
12 files changed, 399 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/14d55dd5/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckBase.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckBase.java
new file mode 100644
index 0000000..104e76d
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckBase.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.service.impl;
+
+import com.codahale.metrics.health.HealthCheck;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
+
+public abstract class ApplicationHealthCheckBase extends HealthCheck {
+ private static final String APP_ID_PATH = "appId";
+ protected static final long DEFAULT_MAX_DELAY_TIME = 2 * 60 * 60 * 1000L;
+ protected static final String MAX_DELAY_TIME_KEY = "application.maxDelayTime";
+
+ protected Config config;
+
+ @Inject
+ private ApplicationEntityService applicationEntityService;
+
+ protected ApplicationHealthCheckBase(Config config) {
+ this.config = config;
+ }
+
+ protected ApplicationEntity.Status getApplicationStatus() {
+ ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(null, config.getString(APP_ID_PATH));
+ return applicationEntity.getStatus();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/14d55dd5/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
index 9d98b2f..4aea963 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.eagle.app.service.impl;
import com.codahale.metrics.health.HealthCheck;
import com.google.inject.Inject;
+import com.google.inject.Injector;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.dropwizard.setup.Environment;
@@ -56,6 +57,9 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer
private static final String HEALTH_PUBLISHER_IMPL_PATH = "application.healthCheck.publisher.publisherImpl";
@Inject
+ private Injector currentInjector;
+
+ @Inject
public ApplicationHealthCheckServiceImpl(ApplicationProviderService applicationProviderService,
ApplicationEntityService applicationEntityService,
Config config) {
@@ -105,11 +109,12 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer
}
ApplicationProvider<?> appProvider = applicationProviderService.getApplicationProviderByType(appEntity.getDescriptor().getType());
HealthCheck applicationHealthCheck = appProvider.getAppHealthCheck(
- ConfigFactory.parseMap(appEntity.getConfiguration())
+ ConfigFactory.parseMap(appEntity.getContext())
.withFallback(config)
- .withFallback(ConfigFactory.parseMap(appEntity.getContext()))
+ .withFallback(ConfigFactory.parseMap(appEntity.getConfiguration()))
);
this.environment.healthChecks().register(appEntity.getAppId(), applicationHealthCheck);
+ currentInjector.injectMembers(applicationHealthCheck);
synchronized (lock) {
if (!appHealthChecks.containsKey(appEntity.getAppId())) {
appHealthChecks.put(appEntity.getAppId(), applicationHealthCheck);
@@ -137,7 +142,6 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer
registerAll();
synchronized (lock) {
for (String appId : appHealthChecks.keySet()) {
- LOG.info("check application {}", appId);
HealthCheck.Result result = appHealthChecks.get(appId).execute();
if (result.isHealthy()) {
LOG.info("application {} is healthy", appId);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/14d55dd5/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
index 143e026..d9c1eff 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
@@ -23,6 +23,7 @@ import org.apache.eagle.app.Application;
import org.apache.eagle.app.service.ApplicationListener;
import org.apache.eagle.common.module.ModuleRegistry;
import org.apache.eagle.metadata.model.ApplicationDesc;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
import java.lang.reflect.ParameterizedType;
import java.util.Optional;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/14d55dd5/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm b/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
index b368458..51c1186 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
@@ -23,7 +23,7 @@
<body>
#set ( $elem = $unHealthyContext )
-<p><b>Message: </b>$elem["appId"] is delayed</p>
+<p><b>Message: </b>$elem["appId"] has delayed</p>
<p><b>Detail: </b>$elem["unHealthyMessage"]</p>
</body>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/14d55dd5/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
index 916dd5b..5d4078c 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
@@ -16,10 +16,17 @@
*/
package org.apache.eagle.hadoop.queue;
+import com.codahale.metrics.health.HealthCheck;
+import com.typesafe.config.Config;
import org.apache.eagle.app.spi.AbstractApplicationProvider;
public class HadoopQueueRunningAppProvider extends AbstractApplicationProvider<HadoopQueueRunningApp> {
public HadoopQueueRunningApp getApplication() {
return new HadoopQueueRunningApp();
}
+
+ @Override
+ public HealthCheck getAppHealthCheck(Config config) {
+ return new HadoopQueueRunningApplicationHealthCheck(config);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/14d55dd5/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
new file mode 100644
index 0000000..c73021b
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthCheckBase {
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningApplicationHealthCheck.class);
+
+ private HadoopQueueRunningAppConfig hadoopQueueRunningAppConfig;
+
+ public HadoopQueueRunningApplicationHealthCheck(Config config) {
+ super(config);
+ this.hadoopQueueRunningAppConfig = new HadoopQueueRunningAppConfig(config);
+ }
+
+ @Override
+ public Result check() {
+ HadoopQueueRunningAppConfig.EagleProps eagleServiceConfig = this.hadoopQueueRunningAppConfig.eagleProps;
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ eagleServiceConfig.eagleService.host,
+ eagleServiceConfig.eagleService.port,
+ eagleServiceConfig.eagleService.username,
+ eagleServiceConfig.eagleService.password);
+
+ client.getJerseyClient().setReadTimeout(60000);
+
+ try {
+ ApplicationEntity.Status status = getApplicationStatus();
+ if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+ String message = String.format("Application is not running, status is %s", status.toString());
+ return Result.unhealthy(message);
+ }
+
+ String query = String.format("%s[@site=\"%s\"]<@site>{max(timestamp)}",
+ Constants.GENERIC_METRIC_SERVICE,
+ hadoopQueueRunningAppConfig.eagleProps.site);
+
+ GenericServiceAPIResponseEntity response = client
+ .search(query)
+ .metricName(HadoopClusterConstants.MetricName.HADOOP_CLUSTER_ALLOCATED_MEMORY)
+ .startTime(System.currentTimeMillis() - 2 * 60 * 60000L)
+ .endTime(System.currentTimeMillis())
+ .pageSize(Integer.MAX_VALUE)
+ .send();
+ List<Map<List<String>, List<Double>>> results = response.getObj();
+ long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+ long currentTimeStamp = System.currentTimeMillis();
+ long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
+ if (hadoopQueueRunningAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
+ maxDelayTime = hadoopQueueRunningAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
+ }
+
+ if (currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
+ String message = String.format("Current process time is %sms, delay %s hours",
+ currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60);
+ return Result.unhealthy(message);
+ } else {
+ return Result.healthy();
+ }
+ } catch (Exception e) {
+ return Result.unhealthy(ExceptionUtils.getStackTrace(e.getCause()));
+ } finally {
+ client.getJerseyClient().destroy();
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/14d55dd5/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
index ab078f5..91e16df 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
@@ -17,11 +17,12 @@
package org.apache.eagle.jpm.mr.history;
-import com.codahale.metrics.health.HealthCheck;
import com.typesafe.config.Config;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.slf4j.Logger;
@@ -30,28 +31,33 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
-public class MRHistoryJobApplicationHealthCheck extends HealthCheck {
+public class MRHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBase {
private static final Logger LOG = LoggerFactory.getLogger(MRHistoryJobApplicationHealthCheck.class);
private MRHistoryJobConfig mrHistoryJobConfig;
- private static final long DEFAULT_MAX_DELAY_TIME = 2 * 60 * 60 * 1000L;
- private static final String MAX_DELAY_TIME_KEY = "application.maxDelayTime";
public MRHistoryJobApplicationHealthCheck(Config config) {
+ super(config);
mrHistoryJobConfig = MRHistoryJobConfig.newInstance(config);
}
@Override
public Result check() {
- try {
- MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = mrHistoryJobConfig.getEagleServiceConfig();
- IEagleServiceClient client = new EagleServiceClientImpl(
- eagleServiceConfig.eagleServiceHost,
- eagleServiceConfig.eagleServicePort,
- eagleServiceConfig.username,
- eagleServiceConfig.password);
+ MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = mrHistoryJobConfig.getEagleServiceConfig();
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ eagleServiceConfig.eagleServiceHost,
+ eagleServiceConfig.eagleServicePort,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password);
+
+ client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
- client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ try {
+ ApplicationEntity.Status status = getApplicationStatus();
+ if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+ String message = String.format("Application is not running, status is %s", status.toString());
+ return Result.unhealthy(message);
+ }
String query = String.format("%s[@site=\"%s\"]<@site>{max(currentTimeStamp)}",
Constants.JPA_JOB_PROCESS_TIME_STAMP_NAME,
@@ -73,14 +79,21 @@ public class MRHistoryJobApplicationHealthCheck extends HealthCheck {
}
if (currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
- String message = String.format("current process time %sms, delay %sms",
- currentProcessTimeStamp, currentTimeStamp - currentProcessTimeStamp);
+ String message = String.format("Current process time is %sms, delay %s hours",
+ currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60);
return Result.unhealthy(message);
} else {
return Result.healthy();
}
} catch (Exception e) {
return Result.unhealthy(ExceptionUtils.getStackTrace(e.getCause()));
+ } finally {
+ client.getJerseyClient().destroy();
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/14d55dd5/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
index 343d9c2..2b962c9 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
@@ -17,6 +17,8 @@
package org.apache.eagle.jpm.spark.history;
+import com.codahale.metrics.health.HealthCheck;
+import com.typesafe.config.Config;
import org.apache.eagle.app.spi.AbstractApplicationProvider;
public class SparkHistoryJobAppProvider extends AbstractApplicationProvider<SparkHistoryJobApp> {
@@ -24,4 +26,9 @@ public class SparkHistoryJobAppProvider extends AbstractApplicationProvider<Spar
public SparkHistoryJobApp getApplication() {
return new SparkHistoryJobApp();
}
+
+ @Override
+ public HealthCheck getAppHealthCheck(Config config) {
+ return new SparkHistoryJobApplicationHealthCheck(config);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/14d55dd5/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
new file mode 100644
index 0000000..bbca566
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBase {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobApplicationHealthCheck.class);
+
+ private SparkHistoryJobAppConfig sparkHistoryJobAppConfig;
+
+ public SparkHistoryJobApplicationHealthCheck(Config config) {
+ super(config);
+ this.sparkHistoryJobAppConfig = SparkHistoryJobAppConfig.newInstance(config);
+ }
+
+ @Override
+ public Result check() {
+ SparkHistoryJobAppConfig.EagleInfo eagleServiceConfig = sparkHistoryJobAppConfig.eagleInfo;
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ eagleServiceConfig.host,
+ eagleServiceConfig.port,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password);
+
+ client.getJerseyClient().setReadTimeout(eagleServiceConfig.timeout * 1000);
+
+ try {
+ ApplicationEntity.Status status = getApplicationStatus();
+ if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+ String message = String.format("Application is not running, status is %s", status.toString());
+ return Result.unhealthy(message);
+ }
+
+ String query = String.format("%s[@site=\"%s\"]<@site>{max(endTime)}",
+ Constants.SPARK_APP_SERVICE_ENDPOINT_NAME,
+ sparkHistoryJobAppConfig.stormConfig.siteId);
+
+ GenericServiceAPIResponseEntity response = client
+ .search(query)
+ .startTime(0L)
+ .endTime(System.currentTimeMillis())
+ .pageSize(10)
+ .send();
+
+ List<Map<List<String>, List<Double>>> results = response.getObj();
+ long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+ long currentTimeStamp = System.currentTimeMillis();
+ long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
+ if (sparkHistoryJobAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
+ maxDelayTime = sparkHistoryJobAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
+ }
+
+ if (currentTimeStamp - currentProcessTimeStamp > maxDelayTime * 3) {
+ String message = String.format("Current process time is %sms, delay %s hours",
+ currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60);
+ return Result.unhealthy(message);
+ } else {
+ return Result.healthy();
+ }
+ } catch (Exception e) {
+ return Result.unhealthy(ExceptionUtils.getStackTrace(e.getCause()));
+ } finally {
+ client.getJerseyClient().destroy();
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/14d55dd5/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
index 5969a7a..5766454 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
@@ -18,6 +18,8 @@
package org.apache.eagle.topology;
+import com.codahale.metrics.health.HealthCheck;
+import com.typesafe.config.Config;
import org.apache.eagle.app.spi.AbstractApplicationProvider;
public class TopologyCheckAppProvider extends AbstractApplicationProvider<TopologyCheckApp> {
@@ -25,4 +27,9 @@ public class TopologyCheckAppProvider extends AbstractApplicationProvider<Topolo
public TopologyCheckApp getApplication() {
return new TopologyCheckApp();
}
+
+ @Override
+ public HealthCheck getAppHealthCheck(Config config) {
+ return new TopologyCheckApplicationHealthCheck(config);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/14d55dd5/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
new file mode 100644
index 0000000..25730c8
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckBase {
+ private static final Logger LOG = LoggerFactory.getLogger(TopologyCheckApplicationHealthCheck.class);
+
+ private TopologyCheckAppConfig topologyCheckAppConfig;
+
+ public TopologyCheckApplicationHealthCheck(Config config) {
+ super(config);
+ topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config);
+ }
+
+ @Override
+ public Result check() {
+ //FIXME, this application owner please add eagle server config to Class TopologyCheckAppConfig
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ topologyCheckAppConfig.getConfig().getString("service.host"),
+ topologyCheckAppConfig.getConfig().getInt("service.port"),
+ topologyCheckAppConfig.getConfig().getString("service.username"),
+ topologyCheckAppConfig.getConfig().getString("service.password"));
+
+ client.getJerseyClient().setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000);
+
+ try {
+ ApplicationEntity.Status status = getApplicationStatus();
+ if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+ String message = String.format("Application is not running, status is %s", status.toString());
+ return Result.unhealthy(message);
+ }
+
+ String query = String.format("%s[@site=\"%s\"]<@site>{max(timestamp)}",
+ TopologyConstants.GENERIC_METRIC_SERVICE,
+ topologyCheckAppConfig.dataExtractorConfig.site);
+
+ GenericServiceAPIResponseEntity response = client
+ .search(query)
+ .metricName(String.format(TopologyConstants.METRIC_LIVE_RATIO_NAME_FORMAT, TopologyConstants.REGIONSERVER_ROLE))
+ .startTime(System.currentTimeMillis() - 2 * 60 * 60000L)
+ .endTime(System.currentTimeMillis())
+ .pageSize(Integer.MAX_VALUE)
+ .send();
+ List<Map<List<String>, List<Double>>> results = response.getObj();
+ long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+ long currentTimeStamp = System.currentTimeMillis();
+ long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
+ if (topologyCheckAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
+ maxDelayTime = topologyCheckAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
+ }
+
+ if (currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
+ String message = String.format("Current process time is %sms, delay %s hours",
+ currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60);
+ return Result.unhealthy(message);
+ } else {
+ return Result.healthy();
+ }
+ } catch (Exception e) {
+ return Result.unhealthy(ExceptionUtils.getStackTrace(e.getCause()));
+ } finally {
+ client.getJerseyClient().destroy();
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/14d55dd5/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java
index 74f446b..3e535a8 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java
@@ -26,6 +26,7 @@ public class TopologyConstants {
public static final String HBASE_INSTANCE_SERVICE_NAME = "HbaseServiceInstance";
public static final String MR_INSTANCE_SERVICE_NAME = "MRServiceInstance";
public static final String JN_INSTANCE_SERVICE_NAME = "JNServiceInstance";
+ public static final String GENERIC_METRIC_SERVICE = "GenericMetricService";
public static final int DEFAULT_READ_TIMEOUT = 30 * 60 * 1000; // in milliseconds
public static final Pattern HTTP_HOST_MATCH_PATTERN = Pattern.compile("^https?://(.+?):-?(\\d+)/?");