You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/30 01:38:27 UTC

[3/5] incubator-eagle git commit: [EAGLE-806] Integrate Metric Process and Persistence with Application Framework

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java
index 7495ef5..40f08c1 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java
@@ -45,7 +45,7 @@ public class JDBCDataSourceProvider implements Provider<DataSource> {
             @Override
             public void run() {
                 try {
-                    LOGGER.info("Shutting down data source");
+                    LOGGER.info("Shutting down data fromStream");
                     datasource.close();
                 } catch (SQLException e) {
                     LOGGER.error("SQLException: {}", e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java
index 6ddf3c6..02bf1cc 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java
@@ -210,10 +210,10 @@ public class ApplicationEntityServiceJDBCImpl implements ApplicationEntityServic
                 ExecutionRuntime runtime = ExecutionRuntimeManager.getInstance().getRuntime(
                     applicationProviderService.getApplicationProviderByType(entity.getDescriptor().getType()).getApplication().getEnvironmentType(), config);
                 StreamSinkConfig streamSinkConfig = runtime.environment()
-                    .streamSink().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), effectiveConfig);
+                    .stream().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), effectiveConfig);
                 StreamDesc streamDesc = new StreamDesc();
                 streamDesc.setSchema(copied);
-                streamDesc.setSink(streamSinkConfig);
+                streamDesc.setSinkConfig(streamSinkConfig);
                 streamDesc.setStreamId(copied.getStreamId());
                 streamDesc.getSchema().setDataSource(entity.getAppId());
                 return streamDesc;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
index d06a3e4..fa59e8a 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
@@ -25,7 +25,8 @@ import java.io.Serializable;
 /**
  * Some common codes to enable DAO through eagle service including service host/post, credential population etc.
  */
-public class EagleServiceConnector implements Serializable{
+@Deprecated
+public class EagleServiceConnector implements Serializable {
     private final String eagleServiceHost;
     private final Integer eagleServicePort;
     private String username;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
index 912f1f7..f0b6283 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.service.client.impl;
 
+import com.typesafe.config.Config;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.client.EagleServiceClientException;
@@ -33,14 +34,24 @@ import java.util.Map;
 public class EagleServiceClientImpl extends EagleServiceBaseClient {
     private final static Logger LOG = LoggerFactory.getLogger(EagleServiceClientImpl.class);
 
-    public EagleServiceClientImpl(String host, int port){
+    public EagleServiceClientImpl(String host, int port) {
         super(host, port);
     }
 
-    public EagleServiceClientImpl(EagleServiceConnector connector){
+    @Deprecated
+    public EagleServiceClientImpl(EagleServiceConnector connector) {
         this(connector.getEagleServiceHost(), connector.getEagleServicePort(), connector.getUsername(), connector.getPassword());
     }
 
+    public EagleServiceClientImpl (Config config) {
+        super(
+            config.hasPath("service.host") ? config.getString("service.host") : "localhost",
+            config.hasPath("service.port") ? config.getInt("service.port") : 9090,
+            config.hasPath("service.username") ? config.getString("service.username") : null,
+            config.hasPath("service.password") ? config.getString("service.password") : null
+        );
+    }
+
     public EagleServiceClientImpl(String host, int port, String username, String password){
         super(host, port, username, password);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
index c78cd3c..f1f4e58 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
@@ -57,7 +57,7 @@ public class ExampleStormApplication extends StormApplication{
 
         @Override
         public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+            outputFieldsDeclarer.declare(new Fields("metric","timestamp","fromStream","value"));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-examples/eagle-app-example/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/resources/application.conf b/eagle-examples/eagle-app-example/src/test/resources/application.conf
index de9be89..0c9ba51 100644
--- a/eagle-examples/eagle-app-example/src/test/resources/application.conf
+++ b/eagle-examples/eagle-app-example/src/test/resources/application.conf
@@ -44,7 +44,7 @@
   },
   "application": {
     "sink": {
-      "type": "org.apache.eagle.app.sink.KafkaStreamSink",
+      "type": "org.apache.eagle.app.messaging.KafkaStreamSink",
       "config": {
         "kafkaBrokerHost": "",
         "kafkaZkConnection": ""

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
index a3052d8..1453c3e 100644
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
@@ -29,7 +29,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
 import org.apache.eagle.gc.executor.GCLogAnalyzerBolt;
 import org.apache.eagle.gc.executor.GCMetricGeneratorBolt;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/pom.xml b/eagle-hadoop-metric/pom.xml
index ad1521f..a3d1cb0 100644
--- a/eagle-hadoop-metric/pom.xml
+++ b/eagle-hadoop-metric/pom.xml
@@ -31,6 +31,20 @@
             <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-app-base</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-classic</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.ow2.asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
new file mode 100644
index 0000000..7f5e21b
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric;
+
+import backtype.storm.generated.StormTopology;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.builder.CEPFunction;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.environment.builder.MetricDefinition;
+
+public class HadoopMetricMonitorApp extends StormApplication {
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        return environment.newApp(config)
+            .fromStream("HADOOP_JMX_METRIC_STREAM")
+            .saveAsMetric(MetricDefinition
+                .namedByField("metric")
+                .eventTimeByField("timestamp")
+                .dimensionFields("host","component","site")
+                .valueField("value"))
+            .toTopology();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java
index e6ebde1..dc7ea97 100644
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java
@@ -16,8 +16,14 @@
  */
 package org.apache.eagle.metric;
 
-import org.apache.eagle.app.StaticApplicationProvider;
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
 
-public class HadoopMetricMonitorAppProdiver extends StaticApplicationProvider {
-    // Metadata: META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
+/**
+ * Metadata: META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml.
+ */
+public class HadoopMetricMonitorAppProdiver extends AbstractApplicationProvider<HadoopMetricMonitorApp> {
+    @Override
+    public HadoopMetricMonitorApp getApplication() {
+        return new HadoopMetricMonitorApp();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml b/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
index 752c0cb..07270a5 100644
--- a/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
+++ b/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
@@ -21,7 +21,7 @@
     <name>Hadoop JMX Metric Monitor</name>
     <version>0.5.0-incubating</version>
     <configuration>
-        <!-- data source configurations -->
+        <!-- data fromStream configurations -->
         <property>
             <name>dataSinkConfig.HADOOP_JMX_METRIC_STREAM.topic</name>
             <displayName>JMX Metric Kafka Topic</displayName>
@@ -41,8 +41,6 @@
         <stream>
             <streamId>HADOOP_JMX_METRIC_STREAM</streamId>
             <description>Hadoop JMX Metric Stream including name node, resource manager, etc.</description>
-            <validate>true</validate>
-            <timeseries>true</timeseries>
             <columns>
                 <column>
                     <name>host</name>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/main/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiverTest.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiverTest.java b/eagle-hadoop-metric/src/main/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiverTest.java
deleted file mode 100644
index ee0b3c0..0000000
--- a/eagle-hadoop-metric/src/main/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiverTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric;
-
-
-import com.google.inject.Inject;
-import org.apache.eagle.app.resource.ApplicationResource;
-import org.apache.eagle.app.service.ApplicationOperations;
-import org.apache.eagle.app.test.ApplicationSimulator;
-import org.apache.eagle.app.test.ApplicationTestBase;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.metadata.model.SiteEntity;
-import org.apache.eagle.metadata.resource.SiteResource;
-import org.apache.eagle.metadata.service.ApplicationStatusUpdateService;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class HadoopMetricMonitorAppProdiverTest extends ApplicationTestBase {
-
-    @Inject
-    private SiteResource siteResource;
-    @Inject
-    private ApplicationResource applicationResource;
-    @Inject
-    private ApplicationSimulator simulator;
-    @Inject
-    ApplicationStatusUpdateService statusUpdateService;
-
-    @Test
-    public void testApplicationLifecycle() throws InterruptedException {
-        // Create local site
-        SiteEntity siteEntity = new SiteEntity();
-        siteEntity.setSiteId("test_site");
-        siteEntity.setSiteName("Test Site");
-        siteEntity.setDescription("Test Site for HADOOP_JMX_METRIC_MONITOR");
-        siteResource.createSite(siteEntity);
-        Assert.assertNotNull(siteEntity.getUuid());
-
-        ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site", "HADOOP_JMX_METRIC_MONITOR", ApplicationEntity.Mode.LOCAL);
-        installOperation.setConfiguration(getConf());
-        // Install application
-        ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
-        // Uninstall application
-        applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
-        try {
-            applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid());
-            Assert.fail("Application instance (UUID: " + applicationEntity.getUuid() + ") should have been uninstalled");
-        } catch (Exception ex) {
-            // Expected exception
-        }
-    }
-
-    private Map<String, Object> getConf() {
-        Map<String, Object> conf = new HashMap<>();
-        conf.put("dataSinkConfig.topic", "testTopic");
-        conf.put("dataSinkConfig.brokerList", "broker");
-        conf.put("dataSinkConfig.serializerClass", "serializerClass");
-        conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
-        conf.put("dataSinkConfig.producerType", "async");
-        conf.put("dataSinkConfig.numBatchMessages", 4096);
-        conf.put("dataSinkConfig.maxQueueBufferMs", 5000);
-        conf.put("dataSinkConfig.requestRequiredAcks", 0);
-        conf.put("spoutNum", 2);
-        conf.put("mode", "LOCAL");
-        return conf;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
new file mode 100644
index 0000000..03ba4ee
--- /dev/null
+++ b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric;
+
+public class HadoopMetricMonitorAppDebug {
+    public static void main(String[] args) {
+        new HadoopMetricMonitorApp().run(args);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProviderTest.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProviderTest.java
new file mode 100644
index 0000000..eb343d9
--- /dev/null
+++ b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProviderTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric;
+
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.resource.ApplicationResource;
+import org.apache.eagle.app.service.ApplicationOperations;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.apache.eagle.app.test.ApplicationTestBase;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.SiteEntity;
+import org.apache.eagle.metadata.resource.SiteResource;
+import org.apache.eagle.metadata.service.ApplicationStatusUpdateService;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class HadoopMetricMonitorAppProviderTest extends ApplicationTestBase {
+
+    @Inject
+    private SiteResource siteResource;
+    @Inject
+    private ApplicationResource applicationResource;
+    @Inject
+    private ApplicationSimulator simulator;
+    @Inject
+    ApplicationStatusUpdateService statusUpdateService;
+
+    @Test
+    public void testApplicationLifecycle() throws InterruptedException {
+        // Create local site
+        SiteEntity siteEntity = new SiteEntity();
+        siteEntity.setSiteId("test_site");
+        siteEntity.setSiteName("Test Site");
+        siteEntity.setDescription("Test Site for HADOOP_JMX_METRIC_MONITOR");
+        siteResource.createSite(siteEntity);
+        Assert.assertNotNull(siteEntity.getUuid());
+
+        ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site", "HADOOP_JMX_METRIC_MONITOR", ApplicationEntity.Mode.LOCAL);
+        installOperation.setConfiguration(getConf());
+        // Install application
+        ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
+        // Uninstall application
+        applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
+        try {
+            applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid());
+            Assert.fail("Application instance (UUID: " + applicationEntity.getUuid() + ") should have been uninstalled");
+        } catch (Exception ex) {
+            // Expected exception
+        }
+    }
+
+    private Map<String, Object> getConf() {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put("dataSinkConfig.topic", "testTopic");
+        conf.put("dataSinkConfig.brokerList", "broker");
+        conf.put("dataSinkConfig.serializerClass", "serializerClass");
+        conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
+        conf.put("dataSinkConfig.producerType", "async");
+        conf.put("dataSinkConfig.numBatchMessages", 4096);
+        conf.put("dataSinkConfig.maxQueueBufferMs", 5000);
+        conf.put("dataSinkConfig.requestRequiredAcks", 0);
+        conf.put("spoutNum", 2);
+        conf.put("mode", "LOCAL");
+        return conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java
new file mode 100644
index 0000000..67b94f8
--- /dev/null
+++ b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.ConfigFactory;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.commons.io.IOUtils;
+import org.apache.eagle.app.messaging.KafkaStreamProvider;
+import org.apache.eagle.app.messaging.KafkaStreamSinkConfig;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.Properties;
+
+public class SendSampleDataToKafka {
+    public static void main(String[] args) throws URISyntaxException, IOException {
+        KafkaStreamSinkConfig config = new KafkaStreamProvider().getSinkConfig("HADOOP_JMX_METRIC_STREAM",ConfigFactory.load());
+        Properties properties = new Properties();
+        properties.put("metadata.broker.list", config.getBrokerList());
+        properties.put("serializer.class", config.getSerializerClass());
+        properties.put("key.serializer.class", config.getKeySerializerClass());
+        // new added properties for async producer
+        properties.put("producer.type", config.getProducerType());
+        properties.put("batch.num.messages", config.getNumBatchMessages());
+        properties.put("request.required.acks", config.getRequestRequiredAcks());
+        properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs());
+        ProducerConfig producerConfig = new ProducerConfig(properties);
+        kafka.javaapi.producer.Producer producer = new kafka.javaapi.producer.Producer(producerConfig);
+        try {
+            InputStream is = SendSampleDataToKafka.class.getResourceAsStream("hadoop_jmx_metric_sample.json");
+            Preconditions.checkNotNull(is, "hadoop_jmx_metric_sample.json");
+            String value = IOUtils.toString(is);
+            producer.send(new KeyedMessage(config.getTopicId(), value));
+        } finally {
+            producer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/resources/application.conf b/eagle-hadoop-metric/src/test/resources/application.conf
new file mode 100644
index 0000000..8ff6016
--- /dev/null
+++ b/eagle-hadoop-metric/src/test/resources/application.conf
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+
+{
+  service {
+    env = "testing"
+    host = "localhost"
+    port = 9090
+    username = "admin"
+    password = "secret"
+    readTimeOutSeconds = 60
+    context = "/rest"
+    timezone = "UTC"
+  }
+  
+  "appId" : "HadoopJmxAppForTest",
+  "mode" : "LOCAL",
+  "siteId" : "testsite",
+  "dataSourceConfig": {
+    "topic" : "hadoop_jmx_metric",
+    "zkConnection" : "localhost:2181",
+    "txZkServers" : "localhost:2181"
+  }
+  "dataSinkConfig": {
+    "topic" : "hadoop_jmx_metric",
+    "brokerList" : "localhost:6667",
+    "serializerClass" : "kafka.serializer.StringEncoder",
+    "keySerializerClass" : "kafka.serializer.StringEncoder"
+    "producerType" : "async",
+    "numBatchMessages" : "4096",
+    "maxQueueBufferMs" : "5000",
+    "requestRequiredAcks" : "0"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json b/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json
new file mode 100644
index 0000000..f0f62f2
--- /dev/null
+++ b/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json
@@ -0,0 +1,8 @@
+{
+  "host":"localhost",
+  "timestamp": 1480319107,
+  "metric": "hadoop.cpu.usage",
+  "component": "namenode",
+  "site": "test",
+  "value": 0.96
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
index 6a7535c..6471dfc 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
@@ -1,102 +1,102 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.hadoop.queue;
-
-import com.typesafe.config.Config;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
-import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthCheckBase {
-    private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningApplicationHealthCheck.class);
-    private static final long DEFAULT_MAX_DELAY_TIME = 10 * 60 * 1000L;
-
-    private HadoopQueueRunningAppConfig hadoopQueueRunningAppConfig;
-
-    public HadoopQueueRunningApplicationHealthCheck(Config config) {
-        super(config);
-        this.hadoopQueueRunningAppConfig = new HadoopQueueRunningAppConfig(config);
-    }
-
-    @Override
-    public Result check() {
-        HadoopQueueRunningAppConfig.EagleProps eagleServiceConfig = this.hadoopQueueRunningAppConfig.eagleProps;
-        IEagleServiceClient client = new EagleServiceClientImpl(
-                eagleServiceConfig.eagleService.host,
-                eagleServiceConfig.eagleService.port,
-                eagleServiceConfig.eagleService.username,
-                eagleServiceConfig.eagleService.password);
-
-        client.getJerseyClient().setReadTimeout(60000);
-
-        String message = "";
-        try {
-            ApplicationEntity.Status status = getApplicationStatus();
-            if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
-                message += String.format("Application is not RUNNING, status is %s. ", status.toString());
-            }
-
-
-            String query = String.format("%s[@site=\"%s\"]<@site>{max(timestamp)}",
-                    Constants.GENERIC_METRIC_SERVICE,
-                    hadoopQueueRunningAppConfig.eagleProps.site);
-
-            GenericServiceAPIResponseEntity response = client
-                    .search(query)
-                    .metricName(HadoopClusterConstants.MetricName.HADOOP_CLUSTER_ALLOCATED_MEMORY)
-                    .startTime(System.currentTimeMillis() - 24 * 60 * 60000L)
-                    .endTime(System.currentTimeMillis())
-                    .pageSize(10)
-                    .send();
-            List<Map<List<String>, List<Double>>> results = response.getObj();
-            long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
-            long currentTimeStamp = System.currentTimeMillis();
-            long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
-            if (hadoopQueueRunningAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
-                maxDelayTime = hadoopQueueRunningAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
-            }
-
-            if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
-                message += String.format("Current process time is %sms, delay %s minutes.",
-                        currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L);
-                return Result.unhealthy(message);
-            } else {
-                return Result.healthy();
-            }
-        } catch (Exception e) {
-            return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
-        } finally {
-            client.getJerseyClient().destroy();
-            try {
-                client.close();
-            } catch (Exception e) {
-                LOG.warn("{}", e);
-            }
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthCheckBase {
+    private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningApplicationHealthCheck.class);
+    private static final long DEFAULT_MAX_DELAY_TIME = 10 * 60 * 1000L;
+
+    private HadoopQueueRunningAppConfig hadoopQueueRunningAppConfig;
+
+    public HadoopQueueRunningApplicationHealthCheck(Config config) {
+        super(config);
+        this.hadoopQueueRunningAppConfig = new HadoopQueueRunningAppConfig(config);
+    }
+
+    @Override
+    public Result check() {
+        HadoopQueueRunningAppConfig.EagleProps eagleServiceConfig = this.hadoopQueueRunningAppConfig.eagleProps;
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                eagleServiceConfig.eagleService.host,
+                eagleServiceConfig.eagleService.port,
+                eagleServiceConfig.eagleService.username,
+                eagleServiceConfig.eagleService.password);
+
+        client.getJerseyClient().setReadTimeout(60000);
+
+        String message = "";
+        try {
+            ApplicationEntity.Status status = getApplicationStatus();
+            if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+                message += String.format("Application is not RUNNING, status is %s. ", status.toString());
+            }
+
+
+            String query = String.format("%s[@site=\"%s\"]<@site>{max(timestamp)}",
+                    Constants.GENERIC_METRIC_SERVICE,
+                    hadoopQueueRunningAppConfig.eagleProps.site);
+
+            GenericServiceAPIResponseEntity response = client
+                    .search(query)
+                    .metricName(HadoopClusterConstants.MetricName.HADOOP_CLUSTER_ALLOCATED_MEMORY)
+                    .startTime(System.currentTimeMillis() - 24 * 60 * 60000L)
+                    .endTime(System.currentTimeMillis())
+                    .pageSize(10)
+                    .send();
+            List<Map<List<String>, List<Double>>> results = response.getObj();
+            long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+            long currentTimeStamp = System.currentTimeMillis();
+            long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
+            if (hadoopQueueRunningAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
+                maxDelayTime = hadoopQueueRunningAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
+            }
+
+            if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
+                message += String.format("Current process time is %sms, delay %s minutes.",
+                        currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L);
+                return Result.unhealthy(message);
+            } else {
+                return Result.healthy();
+            }
+        } catch (Exception e) {
+            return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
+        } finally {
+            client.getJerseyClient().destroy();
+            try {
+                client.close();
+            } catch (Exception e) {
+                LOG.warn("{}", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
index f19c366..e145cf3 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java
@@ -76,7 +76,7 @@ public class AggregationSpout extends BaseRichSpout {
             //1, get last updateTime;
             lastUpdateTime = AggregationTimeManager.instance().readLastFinishTime();
             if (lastUpdateTime == 0L) {
-                //init state, just set to currentTime - 18 hours
+                //prepare state, just set to currentTime - 18 hours
                 lastUpdateTime = (currentJobTime - (MAX_SAFE_TIME + MAX_WAIT_TIME)) / 3600000 * 3600000;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
index 907ccdb..66906f0 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
@@ -17,10 +17,9 @@
 package org.apache.eagle.jpm.mr.history;
 
 import backtype.storm.topology.BoltDeclarer;
-import com.codahale.metrics.health.HealthCheck;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
 import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
@@ -37,11 +36,11 @@ import java.util.regex.Pattern;
 public class MRHistoryJobApplication extends StormApplication {
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
-        //1. trigger init conf
+        //1. trigger prepare conf
         MRHistoryJobConfig appConfig = MRHistoryJobConfig.newInstance(config);
         com.typesafe.config.Config jhfAppConf = appConfig.getConfig();
 
-        //2. init JobHistoryContentFilter
+        //2. prepare JobHistoryContentFilter
         final JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile();
         String[] confKeyPatternsSplit = jhfAppConf.getString("MRConfigureKeys.jobConfigKey").split(",");
         List<String> confKeyPatterns = new ArrayList<>(confKeyPatternsSplit.length);
@@ -60,7 +59,7 @@ public class MRHistoryJobApplication extends StormApplication {
             builder.includeJobKeyPatterns(Pattern.compile(key));
         }
         JobHistoryContentFilter filter = builder.build();
-        //3. init topology
+        //3. prepare topology
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         String spoutName = "mrHistoryJobSpout";
         int tasks = jhfAppConf.getInt("stormConfig.mrHistoryJobSpoutTasks");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
index e5c7c87..de0d846 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -32,7 +32,7 @@ import java.util.List;
 public class MRRunningJobApplication extends StormApplication {
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
-        //1. trigger init conf
+        //1. trigger prepare conf
         MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
 
         String[] confKeyPatternsSplit = mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobConfigKey").split(",");
@@ -46,7 +46,7 @@ public class MRRunningJobApplication extends StormApplication {
         confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB);
         confKeyKeys.add(0, mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobNameKey"));
 
-        //2. init topology
+        //2. prepare topology
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         String spoutName = "mrRunningJobFetchSpout";
         String boltName = "mrRunningJobParseBolt";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
index 66da734..1b7b8ed 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
@@ -1,4 +1,4 @@
-\ufeff<!--
+<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
index d6f5031..fdfcaad 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
@@ -1,99 +1,99 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.history;
-
-import com.typesafe.config.Config;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBase {
-    private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobApplicationHealthCheck.class);
-
-    private SparkHistoryJobAppConfig sparkHistoryJobAppConfig;
-
-    public SparkHistoryJobApplicationHealthCheck(Config config) {
-        super(config);
-        this.sparkHistoryJobAppConfig = SparkHistoryJobAppConfig.newInstance(config);
-    }
-
-    @Override
-    public Result check() {
-        SparkHistoryJobAppConfig.EagleInfo eagleServiceConfig = sparkHistoryJobAppConfig.eagleInfo;
-        IEagleServiceClient client = new EagleServiceClientImpl(
-                eagleServiceConfig.host,
-                eagleServiceConfig.port,
-                eagleServiceConfig.username,
-                eagleServiceConfig.password);
-
-        client.getJerseyClient().setReadTimeout(eagleServiceConfig.timeout * 1000);
-
-        String message = "";
-        try {
-            ApplicationEntity.Status status = getApplicationStatus();
-            if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
-                message += String.format("Application is not RUNNING, status is %s. ", status.toString());
-            }
-
-            String query = String.format("%s[@site=\"%s\"]<@site>{max(endTime)}",
-                    Constants.SPARK_APP_SERVICE_ENDPOINT_NAME,
-                    sparkHistoryJobAppConfig.stormConfig.siteId);
-
-            GenericServiceAPIResponseEntity response = client
-                    .search(query)
-                    .startTime(System.currentTimeMillis() - 12 * 60 * 60000L)
-                    .endTime(System.currentTimeMillis())
-                    .pageSize(10)
-                    .send();
-
-            List<Map<List<String>, List<Double>>> results = response.getObj();
-            long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
-            long currentTimeStamp = System.currentTimeMillis();
-            long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
-            if (sparkHistoryJobAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
-                maxDelayTime = sparkHistoryJobAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
-            }
-
-            if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime * 3) {
-                message += String.format("Current process time is %sms, delay %s hours.",
-                        currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60);
-                return Result.unhealthy(message);
-            } else {
-                return Result.healthy();
-            }
-        } catch (Exception e) {
-            return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
-        } finally {
-            client.getJerseyClient().destroy();
-            try {
-                client.close();
-            } catch (Exception e) {
-                LOG.warn("{}", e);
-            }
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBase {
+    private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobApplicationHealthCheck.class);
+
+    private SparkHistoryJobAppConfig sparkHistoryJobAppConfig;
+
+    public SparkHistoryJobApplicationHealthCheck(Config config) {
+        super(config);
+        this.sparkHistoryJobAppConfig = SparkHistoryJobAppConfig.newInstance(config);
+    }
+
+    @Override
+    public Result check() {
+        SparkHistoryJobAppConfig.EagleInfo eagleServiceConfig = sparkHistoryJobAppConfig.eagleInfo;
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                eagleServiceConfig.host,
+                eagleServiceConfig.port,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password);
+
+        client.getJerseyClient().setReadTimeout(eagleServiceConfig.timeout * 1000);
+
+        String message = "";
+        try {
+            ApplicationEntity.Status status = getApplicationStatus();
+            if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+                message += String.format("Application is not RUNNING, status is %s. ", status.toString());
+            }
+
+            String query = String.format("%s[@site=\"%s\"]<@site>{max(endTime)}",
+                    Constants.SPARK_APP_SERVICE_ENDPOINT_NAME,
+                    sparkHistoryJobAppConfig.stormConfig.siteId);
+
+            GenericServiceAPIResponseEntity response = client
+                    .search(query)
+                    .startTime(System.currentTimeMillis() - 12 * 60 * 60000L)
+                    .endTime(System.currentTimeMillis())
+                    .pageSize(10)
+                    .send();
+
+            List<Map<List<String>, List<Double>>> results = response.getObj();
+            long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+            long currentTimeStamp = System.currentTimeMillis();
+            long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
+            if (sparkHistoryJobAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
+                maxDelayTime = sparkHistoryJobAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
+            }
+
+            if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime * 3) {
+                message += String.format("Current process time is %sms, delay %s hours.",
+                        currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60);
+                return Result.unhealthy(message);
+            } else {
+                return Result.healthy();
+            }
+        } catch (Exception e) {
+            return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
+        } finally {
+            client.getJerseyClient().destroy();
+            try {
+                client.close();
+            } catch (Exception e) {
+                LOG.warn("{}", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
index 209481a..0e1aacd 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
@@ -29,10 +29,10 @@ import com.typesafe.config.Config;
 public class SparkRunningJobApp extends StormApplication {
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
-        //1. trigger init conf
+        //1. trigger prepare conf
         SparkRunningJobAppConfig sparkRunningJobAppConfig = SparkRunningJobAppConfig.newInstance(config);
 
-        //2. init topology
+        //2. prepare topology
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         final String spoutName = SparkRunningJobAppConfig.JOB_FETCH_SPOUT_NAME;
         final String boltName = SparkRunningJobAppConfig.JOB_PARSE_BOLT_NAME;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
index c5c0388..11a22e5 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
@@ -25,7 +25,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
 import storm.kafka.StringScheme;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
index a1daf89..6d7022b 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
@@ -24,18 +24,16 @@ import backtype.storm.topology.BoltDeclarer;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.dataproc.impl.storm.partition.*;
 import org.apache.eagle.security.partition.DataDistributionDaoImpl;
 import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
-import storm.kafka.StringScheme;
 
 /**
  * Since 8/10/16.

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
index 7a4509b..4df4a5b 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java
@@ -26,7 +26,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
 import org.apache.eagle.security.hive.jobrunning.HiveJobRunningSourcedStormSpoutProvider;
 import org.apache.eagle.security.hive.jobrunning.HiveQueryParserBolt;
 import org.apache.eagle.security.hive.jobrunning.JobFilterBolt;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java
index c1c3033..32dcc30 100644
--- a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java
+++ b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java
@@ -19,7 +19,7 @@ package org.apache.eagle.security.oozie.parse;
 
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
 import org.apache.eagle.security.oozie.parse.sensitivity.OozieResourceSensitivityDataJoinBolt;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-server-assembly/src/main/conf/eagle.conf
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/conf/eagle.conf b/eagle-server-assembly/src/main/conf/eagle.conf
index 5f6c240..705ef6f 100644
--- a/eagle-server-assembly/src/main/conf/eagle.conf
+++ b/eagle-server-assembly/src/main/conf/eagle.conf
@@ -83,8 +83,8 @@ metadata {
 # Eagle Application Configuration
 # ---------------------------------------------
 application {
-  sink {
-    type = org.apache.eagle.app.sink.KafkaStreamSink
+  stream {
+    provider = org.apache.eagle.app.messaging.KafkaStreamProvider
   }
   storm {
     nimbusHost = "server.eagle.apache.org"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-server/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf
index ce68550..20f5b2e 100644
--- a/eagle-server/src/main/resources/application.conf
+++ b/eagle-server/src/main/resources/application.conf
@@ -85,8 +85,8 @@ metadata {
 # Eagle Application Configuration
 # ---------------------------------------------
 application {
-  sink {
-    type = org.apache.eagle.app.sink.KafkaStreamSink
+  stream {
+    provider = org.apache.eagle.app.messaging.KafkaStreamProvider
   }
   storm {
     nimbusHost = "server.eagle.apache.org"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
index 93a06f8..ba5914b 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
@@ -23,7 +23,7 @@ import backtype.storm.topology.TopologyBuilder;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.messaging.StormStreamSink;
 import org.apache.eagle.topology.storm.HealthCheckParseBolt;
 import org.apache.eagle.topology.storm.TopologyCheckAppSpout;
 import org.apache.eagle.topology.storm.TopologyDataPersistBolt;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
index 7860cb5..bf5e695 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
@@ -1,109 +1,109 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.topology;
-
-import com.typesafe.config.Config;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckBase {
-    private static final Logger LOG = LoggerFactory.getLogger(TopologyCheckApplicationHealthCheck.class);
-    private static final long DEFAULT_MAX_DELAY_TIME = 10 * 60 * 1000L;
-
-    private TopologyCheckAppConfig topologyCheckAppConfig;
-
-    public TopologyCheckApplicationHealthCheck(Config config) {
-        super(config);
-        topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config);
-    }
-
-    @Override
-    public Result check() {
-        //FIXME, this application owner please add eagle server config to Class TopologyCheckAppConfig
-        IEagleServiceClient client = new EagleServiceClientImpl(
-                topologyCheckAppConfig.getConfig().getString("service.host"),
-                topologyCheckAppConfig.getConfig().getInt("service.port"),
-                topologyCheckAppConfig.getConfig().getString("service.username"),
-                topologyCheckAppConfig.getConfig().getString("service.password"));
-
-        client.getJerseyClient().setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000);
-
-        String message = "";
-        try {
-            ApplicationEntity.Status status = getApplicationStatus();
-            if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
-                message += String.format("Application is not RUNNING, status is %s. ", status.toString());
-            }
-
-            long currentProcessTimeStamp = Math.min(
-                    Math.min(
-                        getServiceLatestUpdateTime(TopologyConstants.HBASE_INSTANCE_SERVICE_NAME, client),
-                        getServiceLatestUpdateTime(TopologyConstants.HDFS_INSTANCE_SERVICE_NAME, client)
-                    ), getServiceLatestUpdateTime(TopologyConstants.MR_INSTANCE_SERVICE_NAME, client));
-            long currentTimeStamp = System.currentTimeMillis();
-            long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
-            if (topologyCheckAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
-                maxDelayTime = topologyCheckAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
-            }
-
-            if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
-                message += String.format("Current process time is %sms, delay %s minutes.",
-                        currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L);
-                return Result.unhealthy(message);
-            } else {
-                return Result.healthy();
-            }
-        } catch (Exception e) {
-            return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
-        } finally {
-            client.getJerseyClient().destroy();
-            try {
-                client.close();
-            } catch (Exception e) {
-                LOG.warn("{}", e);
-            }
-        }
-    }
-
-    private long getServiceLatestUpdateTime(String serviceName, IEagleServiceClient client) throws Exception {
-        String query = String.format("%s[@site=\"%s\"]<@site>{max(lastUpdateTime)}",
-                serviceName,
-                topologyCheckAppConfig.dataExtractorConfig.site);
-
-        GenericServiceAPIResponseEntity response = client
-                .search(query)
-                .pageSize(10)
-                .send();
-
-        List<Map<List<String>, List<Double>>> results = response.getObj();
-        if (results.size() == 0) {
-            return Long.MAX_VALUE;
-        }
-        long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
-        return currentProcessTimeStamp;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckBase {
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyCheckApplicationHealthCheck.class);
+    private static final long DEFAULT_MAX_DELAY_TIME = 10 * 60 * 1000L;
+
+    private TopologyCheckAppConfig topologyCheckAppConfig;
+
+    public TopologyCheckApplicationHealthCheck(Config config) {
+        super(config);
+        topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config);
+    }
+
+    @Override
+    public Result check() {
+        //FIXME, this application owner please add eagle server config to Class TopologyCheckAppConfig
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                topologyCheckAppConfig.getConfig().getString("service.host"),
+                topologyCheckAppConfig.getConfig().getInt("service.port"),
+                topologyCheckAppConfig.getConfig().getString("service.username"),
+                topologyCheckAppConfig.getConfig().getString("service.password"));
+
+        client.getJerseyClient().setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000);
+
+        String message = "";
+        try {
+            ApplicationEntity.Status status = getApplicationStatus();
+            if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) {
+                message += String.format("Application is not RUNNING, status is %s. ", status.toString());
+            }
+
+            long currentProcessTimeStamp = Math.min(
+                    Math.min(
+                        getServiceLatestUpdateTime(TopologyConstants.HBASE_INSTANCE_SERVICE_NAME, client),
+                        getServiceLatestUpdateTime(TopologyConstants.HDFS_INSTANCE_SERVICE_NAME, client)
+                    ), getServiceLatestUpdateTime(TopologyConstants.MR_INSTANCE_SERVICE_NAME, client));
+            long currentTimeStamp = System.currentTimeMillis();
+            long maxDelayTime = DEFAULT_MAX_DELAY_TIME;
+            if (topologyCheckAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) {
+                maxDelayTime = topologyCheckAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY);
+            }
+
+            if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime) {
+                message += String.format("Current process time is %sms, delay %s minutes.",
+                        currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L);
+                return Result.unhealthy(message);
+            } else {
+                return Result.healthy();
+            }
+        } catch (Exception e) {
+            return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
+        } finally {
+            client.getJerseyClient().destroy();
+            try {
+                client.close();
+            } catch (Exception e) {
+                LOG.warn("{}", e);
+            }
+        }
+    }
+
+    private long getServiceLatestUpdateTime(String serviceName, IEagleServiceClient client) throws Exception {
+        String query = String.format("%s[@site=\"%s\"]<@site>{max(lastUpdateTime)}",
+                serviceName,
+                topologyCheckAppConfig.dataExtractorConfig.site);
+
+        GenericServiceAPIResponseEntity response = client
+                .search(query)
+                .pageSize(10)
+                .send();
+
+        List<Map<List<String>, List<Double>>> results = response.getObj();
+        if (results.size() == 0) {
+            return Long.MAX_VALUE;
+        }
+        long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+        return currentProcessTimeStamp;
+    }
+}