You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/30 01:38:27 UTC
[3/5] incubator-eagle git commit: [EAGLE-806] Integrate Metric
Process and Persistence with Application Framework
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java
index 7495ef5..40f08c1 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java
@@ -45,7 +45,7 @@ public class JDBCDataSourceProvider implements Provider<DataSource> {
@Override
public void run() {
try {
- LOGGER.info("Shutting down data source");
+ LOGGER.info("Shutting down data fromStream");
datasource.close();
} catch (SQLException e) {
LOGGER.error("SQLException: {}", e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java
index 6ddf3c6..02bf1cc 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java
@@ -210,10 +210,10 @@ public class ApplicationEntityServiceJDBCImpl implements ApplicationEntityServic
ExecutionRuntime runtime = ExecutionRuntimeManager.getInstance().getRuntime(
applicationProviderService.getApplicationProviderByType(entity.getDescriptor().getType()).getApplication().getEnvironmentType(), config);
StreamSinkConfig streamSinkConfig = runtime.environment()
- .streamSink().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), effectiveConfig);
+ .stream().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), effectiveConfig);
StreamDesc streamDesc = new StreamDesc();
streamDesc.setSchema(copied);
- streamDesc.setSink(streamSinkConfig);
+ streamDesc.setSinkConfig(streamSinkConfig);
streamDesc.setStreamId(copied.getStreamId());
streamDesc.getSchema().setDataSource(entity.getAppId());
return streamDesc;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
index d06a3e4..fa59e8a 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
@@ -25,7 +25,8 @@ import java.io.Serializable;
/**
* Some common codes to enable DAO through eagle service including service host/post, credential population etc.
*/
-public class EagleServiceConnector implements Serializable{
+@Deprecated
+public class EagleServiceConnector implements Serializable {
private final String eagleServiceHost;
private final Integer eagleServicePort;
private String username;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
index 912f1f7..f0b6283 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.eagle.service.client.impl;
+import com.typesafe.config.Config;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.EagleServiceClientException;
@@ -33,14 +34,24 @@ import java.util.Map;
public class EagleServiceClientImpl extends EagleServiceBaseClient {
private final static Logger LOG = LoggerFactory.getLogger(EagleServiceClientImpl.class);
- public EagleServiceClientImpl(String host, int port){
+ public EagleServiceClientImpl(String host, int port) {
super(host, port);
}
- public EagleServiceClientImpl(EagleServiceConnector connector){
+ @Deprecated
+ public EagleServiceClientImpl(EagleServiceConnector connector) {
this(connector.getEagleServiceHost(), connector.getEagleServicePort(), connector.getUsername(), connector.getPassword());
}
+ public EagleServiceClientImpl (Config config) {
+ super(
+ config.hasPath("service.host") ? config.getString("service.host") : "localhost",
+ config.hasPath("service.port") ? config.getInt("service.port") : 9090,
+ config.hasPath("service.username") ? config.getString("service.username") : null,
+ config.hasPath("service.password") ? config.getString("service.password") : null
+ );
+ }
+
public EagleServiceClientImpl(String host, int port, String username, String password){
super(host, port, username, password);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
index c78cd3c..f1f4e58 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
@@ -57,7 +57,7 @@ public class ExampleStormApplication extends StormApplication{
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+ outputFieldsDeclarer.declare(new Fields("metric","timestamp","fromStream","value"));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-examples/eagle-app-example/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/resources/application.conf b/eagle-examples/eagle-app-example/src/test/resources/application.conf
index de9be89..0c9ba51 100644
--- a/eagle-examples/eagle-app-example/src/test/resources/application.conf
+++ b/eagle-examples/eagle-app-example/src/test/resources/application.conf
@@ -44,7 +44,7 @@
},
"application": {
"sink": {
- "type": "org.apache.eagle.app.sink.KafkaStreamSink",
+ "type": "org.apache.eagle.app.messaging.KafkaStreamSink",
"config": {
"kafkaBrokerHost": "",
"kafkaZkConnection": ""
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
index a3052d8..1453c3e 100644
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
@@ -29,7 +29,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
import org.apache.eagle.gc.executor.GCLogAnalyzerBolt;
import org.apache.eagle.gc.executor.GCMetricGeneratorBolt;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/pom.xml b/eagle-hadoop-metric/pom.xml
index ad1521f..a3d1cb0 100644
--- a/eagle-hadoop-metric/pom.xml
+++ b/eagle-hadoop-metric/pom.xml
@@ -31,6 +31,20 @@
<groupId>org.apache.eagle</groupId>
<artifactId>eagle-app-base</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
-</project>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
new file mode 100644
index 0000000..7f5e21b
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric;
+
+import backtype.storm.generated.StormTopology;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.builder.CEPFunction;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.environment.builder.MetricDefinition;
+
+public class HadoopMetricMonitorApp extends StormApplication {
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ return environment.newApp(config)
+ .fromStream("HADOOP_JMX_METRIC_STREAM")
+ .saveAsMetric(MetricDefinition
+ .namedByField("metric")
+ .eventTimeByField("timestamp")
+ .dimensionFields("host","component","site")
+ .valueField("value"))
+ .toTopology();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java
index e6ebde1..dc7ea97 100644
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java
@@ -16,8 +16,14 @@
*/
package org.apache.eagle.metric;
-import org.apache.eagle.app.StaticApplicationProvider;
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
-public class HadoopMetricMonitorAppProdiver extends StaticApplicationProvider {
- // Metadata: META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
+/**
+ * Metadata: META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml.
+ */
+public class HadoopMetricMonitorAppProdiver extends AbstractApplicationProvider<HadoopMetricMonitorApp> {
+ @Override
+ public HadoopMetricMonitorApp getApplication() {
+ return new HadoopMetricMonitorApp();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml b/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
index 752c0cb..07270a5 100644
--- a/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
+++ b/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
@@ -21,7 +21,7 @@
<name>Hadoop JMX Metric Monitor</name>
<version>0.5.0-incubating</version>
<configuration>
- <!-- data source configurations -->
+ <!-- data fromStream configurations -->
<property>
<name>dataSinkConfig.HADOOP_JMX_METRIC_STREAM.topic</name>
<displayName>JMX Metric Kafka Topic</displayName>
@@ -41,8 +41,6 @@
<stream>
<streamId>HADOOP_JMX_METRIC_STREAM</streamId>
<description>Hadoop JMX Metric Stream including name node, resource manager, etc.</description>
- <validate>true</validate>
- <timeseries>true</timeseries>
<columns>
<column>
<name>host</name>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/main/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiverTest.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiverTest.java b/eagle-hadoop-metric/src/main/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiverTest.java
deleted file mode 100644
index ee0b3c0..0000000
--- a/eagle-hadoop-metric/src/main/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiverTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric;
-
-
-import com.google.inject.Inject;
-import org.apache.eagle.app.resource.ApplicationResource;
-import org.apache.eagle.app.service.ApplicationOperations;
-import org.apache.eagle.app.test.ApplicationSimulator;
-import org.apache.eagle.app.test.ApplicationTestBase;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.metadata.model.SiteEntity;
-import org.apache.eagle.metadata.resource.SiteResource;
-import org.apache.eagle.metadata.service.ApplicationStatusUpdateService;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class HadoopMetricMonitorAppProdiverTest extends ApplicationTestBase {
-
- @Inject
- private SiteResource siteResource;
- @Inject
- private ApplicationResource applicationResource;
- @Inject
- private ApplicationSimulator simulator;
- @Inject
- ApplicationStatusUpdateService statusUpdateService;
-
- @Test
- public void testApplicationLifecycle() throws InterruptedException {
- // Create local site
- SiteEntity siteEntity = new SiteEntity();
- siteEntity.setSiteId("test_site");
- siteEntity.setSiteName("Test Site");
- siteEntity.setDescription("Test Site for HADOOP_JMX_METRIC_MONITOR");
- siteResource.createSite(siteEntity);
- Assert.assertNotNull(siteEntity.getUuid());
-
- ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site", "HADOOP_JMX_METRIC_MONITOR", ApplicationEntity.Mode.LOCAL);
- installOperation.setConfiguration(getConf());
- // Install application
- ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
- // Uninstall application
- applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
- try {
- applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid());
- Assert.fail("Application instance (UUID: " + applicationEntity.getUuid() + ") should have been uninstalled");
- } catch (Exception ex) {
- // Expected exception
- }
- }
-
- private Map<String, Object> getConf() {
- Map<String, Object> conf = new HashMap<>();
- conf.put("dataSinkConfig.topic", "testTopic");
- conf.put("dataSinkConfig.brokerList", "broker");
- conf.put("dataSinkConfig.serializerClass", "serializerClass");
- conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
- conf.put("dataSinkConfig.producerType", "async");
- conf.put("dataSinkConfig.numBatchMessages", 4096);
- conf.put("dataSinkConfig.maxQueueBufferMs", 5000);
- conf.put("dataSinkConfig.requestRequiredAcks", 0);
- conf.put("spoutNum", 2);
- conf.put("mode", "LOCAL");
- return conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
new file mode 100644
index 0000000..03ba4ee
--- /dev/null
+++ b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric;
+
+public class HadoopMetricMonitorAppDebug {
+ public static void main(String[] args) {
+ new HadoopMetricMonitorApp().run(args);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProviderTest.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProviderTest.java
new file mode 100644
index 0000000..eb343d9
--- /dev/null
+++ b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProviderTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric;
+
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.resource.ApplicationResource;
+import org.apache.eagle.app.service.ApplicationOperations;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.apache.eagle.app.test.ApplicationTestBase;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.SiteEntity;
+import org.apache.eagle.metadata.resource.SiteResource;
+import org.apache.eagle.metadata.service.ApplicationStatusUpdateService;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class HadoopMetricMonitorAppProviderTest extends ApplicationTestBase {
+
+ @Inject
+ private SiteResource siteResource;
+ @Inject
+ private ApplicationResource applicationResource;
+ @Inject
+ private ApplicationSimulator simulator;
+ @Inject
+ ApplicationStatusUpdateService statusUpdateService;
+
+ @Test
+ public void testApplicationLifecycle() throws InterruptedException {
+ // Create local site
+ SiteEntity siteEntity = new SiteEntity();
+ siteEntity.setSiteId("test_site");
+ siteEntity.setSiteName("Test Site");
+ siteEntity.setDescription("Test Site for HADOOP_JMX_METRIC_MONITOR");
+ siteResource.createSite(siteEntity);
+ Assert.assertNotNull(siteEntity.getUuid());
+
+ ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site", "HADOOP_JMX_METRIC_MONITOR", ApplicationEntity.Mode.LOCAL);
+ installOperation.setConfiguration(getConf());
+ // Install application
+ ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
+ // Uninstall application
+ applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
+ try {
+ applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid());
+ Assert.fail("Application instance (UUID: " + applicationEntity.getUuid() + ") should have been uninstalled");
+ } catch (Exception ex) {
+ // Expected exception
+ }
+ }
+
+ private Map<String, Object> getConf() {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put("dataSinkConfig.topic", "testTopic");
+ conf.put("dataSinkConfig.brokerList", "broker");
+ conf.put("dataSinkConfig.serializerClass", "serializerClass");
+ conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
+ conf.put("dataSinkConfig.producerType", "async");
+ conf.put("dataSinkConfig.numBatchMessages", 4096);
+ conf.put("dataSinkConfig.maxQueueBufferMs", 5000);
+ conf.put("dataSinkConfig.requestRequiredAcks", 0);
+ conf.put("spoutNum", 2);
+ conf.put("mode", "LOCAL");
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java
new file mode 100644
index 0000000..67b94f8
--- /dev/null
+++ b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.ConfigFactory;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.commons.io.IOUtils;
+import org.apache.eagle.app.messaging.KafkaStreamProvider;
+import org.apache.eagle.app.messaging.KafkaStreamSinkConfig;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.Properties;
+
+public class SendSampleDataToKafka {
+ public static void main(String[] args) throws URISyntaxException, IOException {
+ KafkaStreamSinkConfig config = new KafkaStreamProvider().getSinkConfig("HADOOP_JMX_METRIC_STREAM",ConfigFactory.load());
+ Properties properties = new Properties();
+ properties.put("metadata.broker.list", config.getBrokerList());
+ properties.put("serializer.class", config.getSerializerClass());
+ properties.put("key.serializer.class", config.getKeySerializerClass());
+ // new added properties for async producer
+ properties.put("producer.type", config.getProducerType());
+ properties.put("batch.num.messages", config.getNumBatchMessages());
+ properties.put("request.required.acks", config.getRequestRequiredAcks());
+ properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs());
+ ProducerConfig producerConfig = new ProducerConfig(properties);
+ kafka.javaapi.producer.Producer producer = new kafka.javaapi.producer.Producer(producerConfig);
+ try {
+ InputStream is = SendSampleDataToKafka.class.getResourceAsStream("hadoop_jmx_metric_sample.json");
+ Preconditions.checkNotNull(is, "hadoop_jmx_metric_sample.json");
+ String value = IOUtils.toString(is);
+ producer.send(new KeyedMessage(config.getTopicId(), value));
+ } finally {
+ producer.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/resources/application.conf b/eagle-hadoop-metric/src/test/resources/application.conf
new file mode 100644
index 0000000..8ff6016
--- /dev/null
+++ b/eagle-hadoop-metric/src/test/resources/application.conf
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+
+{
+ service {
+ env = "testing"
+ host = "localhost"
+ port = 9090
+ username = "admin"
+ password = "secret"
+ readTimeOutSeconds = 60
+ context = "/rest"
+ timezone = "UTC"
+ }
+
+ "appId" : "HadoopJmxAppForTest",
+ "mode" : "LOCAL",
+ "siteId" : "testsite",
+ "dataSourceConfig": {
+ "topic" : "hadoop_jmx_metric",
+ "zkConnection" : "localhost:2181",
+ "txZkServers" : "localhost:2181"
+ }
+ "dataSinkConfig": {
+ "topic" : "hadoop_jmx_metric",
+ "brokerList" : "localhost:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
+ "producerType" : "async",
+ "numBatchMessages" : "4096",
+ "maxQueueBufferMs" : "5000",
+ "requestRequiredAcks" : "0"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json b/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json
new file mode 100644
index 0000000..f0f62f2
--- /dev/null
+++ b/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json
@@ -0,0 +1,8 @@
+{
+ "host":"localhost",
+ "timestamp": 1480319107,
+ "metric": "hadoop.cpu.usage",
+ "component": "namenode",
+ "site": "test",
+ "value": 0.96
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
index 6a7535c..6471dfc 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
@@ -1,102 +1,102 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.hadoop.queue;
-
-import com.typesafe.config.Config;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
-import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthCheckBase {
- private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningApplicationHealthCheck.class);
- private static final long DEFAULT_MAX_DELAY_TIME = 10 * 60 * 1000L;
-
- private HadoopQueueRunningAppConfig hadoopQueueRunningAppConfig;
-
- public HadoopQueueRunningApplicationHealthCheck(Config config) {
- super(config);
- this.hadoopQueueRunningAppConfig = new HadoopQueueRunningAppConfig(config);
- }
-
- @Override
- public Result check() {
- HadoopQueueRunningAppConfig.EagleProps eagleServiceConfig = this.hadoopQueueRunningAppConfig.eagleProps;
- IEagleServiceClient client = new EagleServiceClientImpl(
- eagleServiceConfig.eagleService.host,
- eagleServiceConfig.eagleService.port,
- eagleServiceConfig.eagleService.username,
- eagleServiceConfig.eagleService.password);
-
- client.getJerseyClient().setReadTimeout(60000);
-
- String message = "";
- try {
- ApplicationEntity.Status status = getApplicationStatus();
- if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
- message += String.format("Application is not RUNNING, status is %s. ", status.toString());
- }
-
-
- String query = String.format("%s[@site=\"%s\"]<@site>{max(timestamp)}",
- Constants.GENERIC_METRIC_SERVICE,
- hadoopQueueRunningAppConfig.eagleProps.site);
-
- GenericServiceAPIResponseEntity response = client
- .search(query)
- .metricName(HadoopClusterConstants.MetricName.HADOOP_CLUSTER_ALLOCATED_MEMORY)
- .startTime(System.currentTimeMillis() - 24 * 60 * 60000L)
- .endTime(System.currentTimeMillis())
- .pageSize(10)
- .send();
- List<Map<List<String>, List<Double>>> results = response.getObj();
- long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
- long currentTimeStamp = System.currentTimeMillis();
- long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
- if (hadoopQueueRunningAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
- maxDelayTime = hadoopQueueRunningAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
- }
-
- if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
- message += String.format("Current process time is %sms, delay %s minutes.",
- currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L);
- return Result.unhealthy(message);
- } else {
- return Result.healthy();
- }
- } catch (Exception e) {
- return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
- } finally {
- client.getJerseyClient().destroy();
- try {
- client.close();
- } catch (Exception e) {
- LOG.warn("{}", e);
- }
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthCheckBase {
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningApplicationHealthCheck.class);
+ private static final long DEFAULT_MAX_DELAY_TIME = 10 * 60 * 1000L;
+
+ private HadoopQueueRunningAppConfig hadoopQueueRunningAppConfig;
+
+ public HadoopQueueRunningApplicationHealthCheck(Config config) {
+ super(config);
+ this.hadoopQueueRunningAppConfig = new HadoopQueueRunningAppConfig(config);
+ }
+
+ @Override
+ public Result check() {
+ HadoopQueueRunningAppConfig.EagleProps eagleServiceConfig = this.hadoopQueueRunningAppConfig.eagleProps;
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ eagleServiceConfig.eagleService.host,
+ eagleServiceConfig.eagleService.port,
+ eagleServiceConfig.eagleService.username,
+ eagleServiceConfig.eagleService.password);
+
+ client.getJerseyClient().setReadTimeout(60000);
+
+ String message = "";
+ try {
+ ApplicationEntity.Status status = getApplicationStatus();
+ if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+ message += String.format("Application is not RUNNING, status is %s. ", status.toString());
+ }
+
+
+ String query = String.format("%s[@site=\"%s\"]<@site>{max(timestamp)}",
+ Constants.GENERIC_METRIC_SERVICE,
+ hadoopQueueRunningAppConfig.eagleProps.site);
+
+ GenericServiceAPIResponseEntity response = client
+ .search(query)
+ .metricName(HadoopClusterConstants.MetricName.HADOOP_CLUSTER_ALLOCATED_MEMORY)
+ .startTime(System.currentTimeMillis() - 24 * 60 * 60000L)
+ .endTime(System.currentTimeMillis())
+ .pageSize(10)
+ .send();
+ List<Map<List<String>, List<Double>>> results = response.getObj();
+ long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+ long currentTimeStamp = System.currentTimeMillis();
+ long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
+ if (hadoopQueueRunningAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
+ maxDelayTime = hadoopQueueRunningAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
+ }
+
+ if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
+ message += String.format("Current process time is %sms, delay %s minutes.",
+ currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L);
+ return Result.unhealthy(message);
+ } else {
+ return Result.healthy();
+ }
+ } catch (Exception e) {
+ return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
+ } finally {
+ client.getJerseyClient().destroy();
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
index f19c366..e145cf3 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
@@ -76,7 +76,7 @@ public class AggregationSpout extends BaseRichSpout {
//1, get last updateTime;
lastUpdateTime = AggregationTimeManager.instance().readLastFinishTime();
if (lastUpdateTime == 0L) {
- //init state, just set to currentTime - 18 hours
+ //prepare state, just set to currentTime - 18 hours
lastUpdateTime = (currentJobTime - (MAX_SAFE_TIME + MAX_WAIT_TIME)) / 3600000 * 3600000;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
index 907ccdb..66906f0 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
@@ -17,10 +17,9 @@
package org.apache.eagle.jpm.mr.history;
import backtype.storm.topology.BoltDeclarer;
-import com.codahale.metrics.health.HealthCheck;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
@@ -37,11 +36,11 @@ import java.util.regex.Pattern;
public class MRHistoryJobApplication extends StormApplication {
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
- //1. trigger init conf
+ //1. trigger prepare conf
MRHistoryJobConfig appConfig = MRHistoryJobConfig.newInstance(config);
com.typesafe.config.Config jhfAppConf = appConfig.getConfig();
- //2. init JobHistoryContentFilter
+ //2. prepare JobHistoryContentFilter
final JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile();
String[] confKeyPatternsSplit = jhfAppConf.getString("MRConfigureKeys.jobConfigKey").split(",");
List<String> confKeyPatterns = new ArrayList<>(confKeyPatternsSplit.length);
@@ -60,7 +59,7 @@ public class MRHistoryJobApplication extends StormApplication {
builder.includeJobKeyPatterns(Pattern.compile(key));
}
JobHistoryContentFilter filter = builder.build();
- //3. init topology
+ //3. prepare topology
TopologyBuilder topologyBuilder = new TopologyBuilder();
String spoutName = "mrHistoryJobSpout";
int tasks = jhfAppConf.getInt("stormConfig.mrHistoryJobSpoutTasks");
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
index e5c7c87..de0d846 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -32,7 +32,7 @@ import java.util.List;
public class MRRunningJobApplication extends StormApplication {
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
- //1. trigger init conf
+ //1. trigger prepare conf
MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
String[] confKeyPatternsSplit = mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobConfigKey").split(",");
@@ -46,7 +46,7 @@ public class MRRunningJobApplication extends StormApplication {
confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB);
confKeyKeys.add(0, mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobNameKey"));
- //2. init topology
+ //2. prepare topology
TopologyBuilder topologyBuilder = new TopologyBuilder();
String spoutName = "mrRunningJobFetchSpout";
String boltName = "mrRunningJobParseBolt";
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
index 66da734..1b7b8ed 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
@@ -1,4 +1,4 @@
-\ufeff<!--
+<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
index d6f5031..fdfcaad 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
@@ -1,99 +1,99 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.history;
-
-import com.typesafe.config.Config;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBase {
- private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobApplicationHealthCheck.class);
-
- private SparkHistoryJobAppConfig sparkHistoryJobAppConfig;
-
- public SparkHistoryJobApplicationHealthCheck(Config config) {
- super(config);
- this.sparkHistoryJobAppConfig = SparkHistoryJobAppConfig.newInstance(config);
- }
-
- @Override
- public Result check() {
- SparkHistoryJobAppConfig.EagleInfo eagleServiceConfig = sparkHistoryJobAppConfig.eagleInfo;
- IEagleServiceClient client = new EagleServiceClientImpl(
- eagleServiceConfig.host,
- eagleServiceConfig.port,
- eagleServiceConfig.username,
- eagleServiceConfig.password);
-
- client.getJerseyClient().setReadTimeout(eagleServiceConfig.timeout * 1000);
-
- String message = "";
- try {
- ApplicationEntity.Status status = getApplicationStatus();
- if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
- message += String.format("Application is not RUNNING, status is %s. ", status.toString());
- }
-
- String query = String.format("%s[@site=\"%s\"]<@site>{max(endTime)}",
- Constants.SPARK_APP_SERVICE_ENDPOINT_NAME,
- sparkHistoryJobAppConfig.stormConfig.siteId);
-
- GenericServiceAPIResponseEntity response = client
- .search(query)
- .startTime(System.currentTimeMillis() - 12 * 60 * 60000L)
- .endTime(System.currentTimeMillis())
- .pageSize(10)
- .send();
-
- List<Map<List<String>, List<Double>>> results = response.getObj();
- long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
- long currentTimeStamp = System.currentTimeMillis();
- long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
- if (sparkHistoryJobAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
- maxDelayTime = sparkHistoryJobAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
- }
-
- if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime * 3) {
- message += String.format("Current process time is %sms, delay %s hours.",
- currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60);
- return Result.unhealthy(message);
- } else {
- return Result.healthy();
- }
- } catch (Exception e) {
- return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
- } finally {
- client.getJerseyClient().destroy();
- try {
- client.close();
- } catch (Exception e) {
- LOG.warn("{}", e);
- }
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBase {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobApplicationHealthCheck.class);
+
+ private SparkHistoryJobAppConfig sparkHistoryJobAppConfig;
+
+ public SparkHistoryJobApplicationHealthCheck(Config config) {
+ super(config);
+ this.sparkHistoryJobAppConfig = SparkHistoryJobAppConfig.newInstance(config);
+ }
+
+ @Override
+ public Result check() {
+ SparkHistoryJobAppConfig.EagleInfo eagleServiceConfig = sparkHistoryJobAppConfig.eagleInfo;
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ eagleServiceConfig.host,
+ eagleServiceConfig.port,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password);
+
+ client.getJerseyClient().setReadTimeout(eagleServiceConfig.timeout * 1000);
+
+ String message = "";
+ try {
+ ApplicationEntity.Status status = getApplicationStatus();
+ if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+ message += String.format("Application is not RUNNING, status is %s. ", status.toString());
+ }
+
+ String query = String.format("%s[@site=\"%s\"]<@site>{max(endTime)}",
+ Constants.SPARK_APP_SERVICE_ENDPOINT_NAME,
+ sparkHistoryJobAppConfig.stormConfig.siteId);
+
+ GenericServiceAPIResponseEntity response = client
+ .search(query)
+ .startTime(System.currentTimeMillis() - 12 * 60 * 60000L)
+ .endTime(System.currentTimeMillis())
+ .pageSize(10)
+ .send();
+
+ List<Map<List<String>, List<Double>>> results = response.getObj();
+ long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+ long currentTimeStamp = System.currentTimeMillis();
+ long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
+ if (sparkHistoryJobAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
+ maxDelayTime = sparkHistoryJobAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
+ }
+
+ if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime * 3) {
+ message += String.format("Current process time is %sms, delay %s hours.",
+ currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60);
+ return Result.unhealthy(message);
+ } else {
+ return Result.healthy();
+ }
+ } catch (Exception e) {
+ return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
+ } finally {
+ client.getJerseyClient().destroy();
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
index 209481a..0e1aacd 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
@@ -29,10 +29,10 @@ import com.typesafe.config.Config;
public class SparkRunningJobApp extends StormApplication {
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
- //1. trigger init conf
+ //1. trigger prepare conf
SparkRunningJobAppConfig sparkRunningJobAppConfig = SparkRunningJobAppConfig.newInstance(config);
- //2. init topology
+ //2. prepare topology
TopologyBuilder topologyBuilder = new TopologyBuilder();
final String spoutName = SparkRunningJobAppConfig.JOB_FETCH_SPOUT_NAME;
final String boltName = SparkRunningJobAppConfig.JOB_PARSE_BOLT_NAME;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
index c5c0388..11a22e5 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
@@ -25,7 +25,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
import storm.kafka.StringScheme;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
index a1daf89..6d7022b 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
@@ -24,18 +24,16 @@ import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
import com.typesafe.config.Config;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.dataproc.impl.storm.partition.*;
import org.apache.eagle.security.partition.DataDistributionDaoImpl;
import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
-import storm.kafka.StringScheme;
/**
* Since 8/10/16.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
index 7a4509b..4df4a5b 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
@@ -26,7 +26,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
import org.apache.eagle.security.hive.jobrunning.HiveJobRunningSourcedStormSpoutProvider;
import org.apache.eagle.security.hive.jobrunning.HiveQueryParserBolt;
import org.apache.eagle.security.hive.jobrunning.JobFilterBolt;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java
index c1c3033..32dcc30 100644
--- a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java
+++ b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java
@@ -19,7 +19,7 @@ package org.apache.eagle.security.oozie.parse;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
import org.apache.eagle.security.oozie.parse.sensitivity.OozieResourceSensitivityDataJoinBolt;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-server-assembly/src/main/conf/eagle.conf
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/conf/eagle.conf b/eagle-server-assembly/src/main/conf/eagle.conf
index 5f6c240..705ef6f 100644
--- a/eagle-server-assembly/src/main/conf/eagle.conf
+++ b/eagle-server-assembly/src/main/conf/eagle.conf
@@ -83,8 +83,8 @@ metadata {
# Eagle Application Configuration
# ---------------------------------------------
application {
- sink {
- type = org.apache.eagle.app.sink.KafkaStreamSink
+ stream {
+ provider = org.apache.eagle.app.messaging.KafkaStreamProvider
}
storm {
nimbusHost = "server.eagle.apache.org"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-server/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf
index ce68550..20f5b2e 100644
--- a/eagle-server/src/main/resources/application.conf
+++ b/eagle-server/src/main/resources/application.conf
@@ -85,8 +85,8 @@ metadata {
# Eagle Application Configuration
# ---------------------------------------------
application {
- sink {
- type = org.apache.eagle.app.sink.KafkaStreamSink
+ stream {
+ provider = org.apache.eagle.app.messaging.KafkaStreamProvider
}
storm {
nimbusHost = "server.eagle.apache.org"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
index 93a06f8..ba5914b 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
@@ -23,7 +23,7 @@ import backtype.storm.topology.TopologyBuilder;
import com.typesafe.config.Config;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
import org.apache.eagle.topology.storm.HealthCheckParseBolt;
import org.apache.eagle.topology.storm.TopologyCheckAppSpout;
import org.apache.eagle.topology.storm.TopologyDataPersistBolt;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
index 7860cb5..bf5e695 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
@@ -1,109 +1,109 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.topology;
-
-import com.typesafe.config.Config;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckBase {
- private static final Logger LOG = LoggerFactory.getLogger(TopologyCheckApplicationHealthCheck.class);
- private static final long DEFAULT_MAX_DELAY_TIME = 10 * 60 * 1000L;
-
- private TopologyCheckAppConfig topologyCheckAppConfig;
-
- public TopologyCheckApplicationHealthCheck(Config config) {
- super(config);
- topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config);
- }
-
- @Override
- public Result check() {
- //FIXME, this application owner please add eagle server config to Class TopologyCheckAppConfig
- IEagleServiceClient client = new EagleServiceClientImpl(
- topologyCheckAppConfig.getConfig().getString("service.host"),
- topologyCheckAppConfig.getConfig().getInt("service.port"),
- topologyCheckAppConfig.getConfig().getString("service.username"),
- topologyCheckAppConfig.getConfig().getString("service.password"));
-
- client.getJerseyClient().setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000);
-
- String message = "";
- try {
- ApplicationEntity.Status status = getApplicationStatus();
- if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
- message += String.format("Application is not RUNNING, status is %s. ", status.toString());
- }
-
- long currentProcessTimeStamp = Math.min(
- Math.min(
- getServiceLatestUpdateTime(TopologyConstants.HBASE_INSTANCE_SERVICE_NAME, client),
- getServiceLatestUpdateTime(TopologyConstants.HDFS_INSTANCE_SERVICE_NAME, client)
- ), getServiceLatestUpdateTime(TopologyConstants.MR_INSTANCE_SERVICE_NAME, client));
- long currentTimeStamp = System.currentTimeMillis();
- long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
- if (topologyCheckAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
- maxDelayTime = topologyCheckAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
- }
-
- if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
- message += String.format("Current process time is %sms, delay %s minutes.",
- currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L);
- return Result.unhealthy(message);
- } else {
- return Result.healthy();
- }
- } catch (Exception e) {
- return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
- } finally {
- client.getJerseyClient().destroy();
- try {
- client.close();
- } catch (Exception e) {
- LOG.warn("{}", e);
- }
- }
- }
-
- private long getServiceLatestUpdateTime(String serviceName, IEagleServiceClient client) throws Exception {
- String query = String.format("%s[@site=\"%s\"]<@site>{max(lastUpdateTime)}",
- serviceName,
- topologyCheckAppConfig.dataExtractorConfig.site);
-
- GenericServiceAPIResponseEntity response = client
- .search(query)
- .pageSize(10)
- .send();
-
- List<Map<List<String>, List<Double>>> results = response.getObj();
- if (results.size() == 0) {
- return Long.MAX_VALUE;
- }
- long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
- return currentProcessTimeStamp;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckBase {
+ private static final Logger LOG = LoggerFactory.getLogger(TopologyCheckApplicationHealthCheck.class);
+ private static final long DEFAULT_MAX_DELAY_TIME = 10 * 60 * 1000L;
+
+ private TopologyCheckAppConfig topologyCheckAppConfig;
+
+ public TopologyCheckApplicationHealthCheck(Config config) {
+ super(config);
+ topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config);
+ }
+
+ @Override
+ public Result check() {
+ //FIXME, this application owner please add eagle server config to Class TopologyCheckAppConfig
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ topologyCheckAppConfig.getConfig().getString("service.host"),
+ topologyCheckAppConfig.getConfig().getInt("service.port"),
+ topologyCheckAppConfig.getConfig().getString("service.username"),
+ topologyCheckAppConfig.getConfig().getString("service.password"));
+
+ client.getJerseyClient().setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000);
+
+ String message = "";
+ try {
+ ApplicationEntity.Status status = getApplicationStatus();
+ if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+ message += String.format("Application is not RUNNING, status is %s. ", status.toString());
+ }
+
+ long currentProcessTimeStamp = Math.min(
+ Math.min(
+ getServiceLatestUpdateTime(TopologyConstants.HBASE_INSTANCE_SERVICE_NAME, client),
+ getServiceLatestUpdateTime(TopologyConstants.HDFS_INSTANCE_SERVICE_NAME, client)
+ ), getServiceLatestUpdateTime(TopologyConstants.MR_INSTANCE_SERVICE_NAME, client));
+ long currentTimeStamp = System.currentTimeMillis();
+ long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
+ if (topologyCheckAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
+ maxDelayTime = topologyCheckAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
+ }
+
+ if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
+ message += String.format("Current process time is %sms, delay %s minutes.",
+ currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L);
+ return Result.unhealthy(message);
+ } else {
+ return Result.healthy();
+ }
+ } catch (Exception e) {
+ return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
+ } finally {
+ client.getJerseyClient().destroy();
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
+ }
+ }
+
+ private long getServiceLatestUpdateTime(String serviceName, IEagleServiceClient client) throws Exception {
+ String query = String.format("%s[@site=\"%s\"]<@site>{max(lastUpdateTime)}",
+ serviceName,
+ topologyCheckAppConfig.dataExtractorConfig.site);
+
+ GenericServiceAPIResponseEntity response = client
+ .search(query)
+ .pageSize(10)
+ .send();
+
+ List<Map<List<String>, List<Double>>> results = response.getObj();
+ if (results.size() == 0) {
+ return Long.MAX_VALUE;
+ }
+ long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+ return currentProcessTimeStamp;
+ }
+}