You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/02/16 05:24:22 UTC

[2/2] eagle git commit: [EAGLE-895] Improve alert engine metadata to organize by siteId

[EAGLE-895] Improve alert engine metadata to organize by siteId

https://issues.apache.org/jira/browse/EAGLE-895

Author: Hao Chen <ha...@apache.org>

Closes #801 from haoch/AddPolicySiteId.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/49ca3b0e
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/49ca3b0e
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/49ca3b0e

Branch: refs/heads/master
Commit: 49ca3b0ec481f6fcfbf339cb6d3b63b4dede1011
Parents: 7681287
Author: Hao Chen <ha...@apache.org>
Authored: Thu Feb 16 13:24:08 2017 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Thu Feb 16 13:24:08 2017 +0800

----------------------------------------------------------------------
 .../eagle/alert/app/AlertEagleStorePlugin.java  |  16 +-
 .../app/AlertUnitTopologyAppProviderTest.java   |   2 +-
 .../engine/coordinator/PolicyDefinition.java    |  20 +-
 .../engine/coordinator/PublishmentType.java     |  73 +-
 .../coordinator/PolicyDefinitionTest.java       |   2 +-
 .../publisher/AlertPublishPluginProvider.java   |  24 +
 .../publisher/PublishementTypeLoader.java       |  58 ++
 .../publisher/impl/AlertEagleStorePlugin.java   |  12 +-
 .../publisher/impl/AlertEmailPublisher.java     |  16 +-
 .../publisher/impl/AlertFilePublisher.java      |  13 +-
 .../publisher/impl/AlertKafkaPublisher.java     |  17 +-
 .../publisher/impl/AlertSlackPublisher.java     |  17 +-
 .../publisher/PublishementTypeLoaderTest.java   |  27 +
 .../metadata/resource/MetadataResource.java     |  51 +-
 .../resource/StreamDefinitionWrapper.java       |  72 ++
 .../eagle/alert/metadata/IMetadataDao.java      |   8 +
 .../environment/impl/StormExecutionRuntime.java |   3 +-
 .../ApplicationStatusUpdateServiceImpl.java     |  73 +-
 .../eagle/app/spi/ApplicationProvider.java      |   2 +-
 .../app/test/ApplicationSimulatorImpl.java      |   3 +-
 .../eagle/app/test/ApplicationTestBase.java     |  25 +
 .../app/resource/ApplicationResourceTest.java   |   2 +-
 eagle-core/eagle-common/pom.xml                 |   4 +
 .../eagle/common/utils/ReflectionsHelper.java   |  48 ++
 .../eagle/service/hbase/EmbeddedHbase.java      |   7 +-
 .../eagle/service/hbase/EmbeddedHbaseTest.java  |   6 +-
 .../eagle/service/hbase/TestHBaseBase.java      |  44 +-
 .../service/ApplicationStatusUpdateService.java |   2 -
 .../client/impl/EagleServiceClientImpl.java     |   9 +-
 .../eagle/service/client/ClientTestBase.java    |   2 +-
 .../eagle-query/eagle-entity-base/pom.xml       |  28 +-
 .../entity/repo/EntityRepositoryScanner.java    |  92 ++-
 .../TestGenericEntityIndexStreamReader.java     |  14 +-
 .../eagle/log/entity/TestTestLogAPIEntity.java  | 735 ++++++++++---------
 .../repo/TestEntityRepositoryScanner.java       |   5 +-
 .../eagle/storage/hbase/TestHBaseStatement.java |  17 +-
 .../storage/hbase/TestWithHBaseCoprocessor.java |  77 ++
 .../coprocessor/TestGroupAggregateClient.java   |  60 +-
 .../TestGroupAggregateTimeSeriesClient.java     |  26 +-
 .../storage/hbase/spi/TestHBaseStorage.java     |  22 +-
 ...estHBaseStorageAggregateWithCoprocessor.java |  36 +-
 .../hbase/spi/TestHBaseStorageLoader.java       |  17 +-
 .../src/test/resources/log4j.properties         |   2 +-
 .../example/ExampleApplicationProviderTest.java |   5 +-
 .../eagle/app/jpm/JPMWebApplicationTest.java    |   1 +
 .../auditlog/TestHdfsAuditLogApplication.java   |   7 +-
 .../apache/eagle/server/ServerApplication.java  |   8 +
 pom.xml                                         |  14 +-
 48 files changed, 1190 insertions(+), 634 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
index 30d2b78..0b58bf7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
@@ -19,7 +19,9 @@ package org.apache.eagle.alert.app;
 import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
 import org.apache.eagle.alert.engine.publisher.impl.AbstractPublishPlugin;
 import org.apache.eagle.alert.utils.AlertConstants;
 import org.apache.eagle.metadata.model.AlertEntity;
@@ -36,15 +38,14 @@ import java.util.Map;
 
 import static org.apache.eagle.alert.engine.model.AlertPublishEvent.*;
 
-public class AlertEagleStorePlugin extends AbstractPublishPlugin {
+public class AlertEagleStorePlugin extends AbstractPublishPlugin implements AlertPublishPluginProvider {
     private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class);
     private IEagleServiceClient client;
 
     @Override
     public void init(Config config, Publishment publishment, Map conf) throws Exception {
         super.init(config, publishment, conf);
-        client = new EagleServiceClientImpl(config.getString("service.host"), config.getInt("service.port"),
-                config.getString("service.username"), config.getString("service.password"));
+        client = new EagleServiceClientImpl(config);
     }
 
     @Override
@@ -94,4 +95,13 @@ public class AlertEagleStorePlugin extends AbstractPublishPlugin {
         alertEvent.setTags(tags);
         return alertEvent;
     }
+
+    @Override
+    public PublishmentType getPluginType() {
+        return new PublishmentType.Builder()
+                .name("HBaseStorage")
+                .type(getClass())
+                .description("HBase Storage alert publisher")
+                .build();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java b/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java
index 927d505..4383484 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java
@@ -56,7 +56,7 @@ public class AlertUnitTopologyAppProviderTest extends ApplicationTestBase {
         statusUpdateService.updateApplicationEntityStatus(applicationEntity);
         // Stop application
         applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
-        statusUpdateService.updateApplicationEntityStatus(applicationEntity);
+        awaitApplicationStop(applicationEntity);
         // Uninstall application
         applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
         try {

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 7398dd5..c377e41 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -35,8 +35,9 @@ public class PolicyDefinition implements Serializable {
     @Length(min = 1, max = 50, message = "length should between 1 and 50")
     private String name;
     private String description;
-    private List<String> inputStreams = new ArrayList<String>();
-    private List<String> outputStreams = new ArrayList<String>();
+    private List<String> inputStreams = new ArrayList<>();
+    private List<String> outputStreams = new ArrayList<>();
+    private String siteId = "default";
 
     private Definition definition;
     private Definition stateDefinition;
@@ -137,6 +138,7 @@ public class PolicyDefinition implements Serializable {
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
+                .append(siteId)
                 .append(name)
                 .append(inputStreams)
                 .append(outputStreams)
@@ -160,7 +162,8 @@ public class PolicyDefinition implements Serializable {
 
         PolicyDefinition another = (PolicyDefinition) that;
 
-        if (Objects.equals(another.name, this.name)
+        if (Objects.equals(another.siteId, this.siteId)
+                && Objects.equals(another.name, this.name)
                 && Objects.equals(another.description, this.description)
                 && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
                 && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
@@ -191,6 +194,14 @@ public class PolicyDefinition implements Serializable {
         return alertDefinition == null ? null : alertDefinition.getCategory();
     }
 
+    public String getSiteId() {
+        return siteId;
+    }
+
+    public void setSiteId(String siteId) {
+        this.siteId = siteId;
+    }
+
     @JsonIgnoreProperties(ignoreUnknown = true)
     public static class Definition implements Serializable {
         private static final long serialVersionUID = -622366527887848346L;
@@ -294,9 +305,8 @@ public class PolicyDefinition implements Serializable {
         ENABLED, DISABLED
     }
 
-
     @Override
     public String toString() {
-        return String.format("{name=\"%s\",definition=%s}", this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString());
+        return String.format("{site=\"%s\", name=\"%s\",definition=%s}", this.getSiteId(), this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
index 5bd15bc..f7025f2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
@@ -21,16 +21,25 @@ package org.apache.eagle.alert.engine.coordinator;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class PublishmentType {
     private String name;
+
+    @Override
+    public String toString() {
+        return "PublishmentType{"
+                + "name='" + name + '\''
+                + ", type='" + type + '\''
+                + ", description='" + description + '\''
+                + ", fields=" + fields
+                + '}';
+    }
+
     private String type;
     private String description;
-    private List<Map<String, String>> fields;
+    private List<Map<String, String>> fields = new LinkedList<>();
 
     public String getName() {
         return name;
@@ -64,6 +73,8 @@ public class PublishmentType {
         this.fields = fields;
     }
 
+
+
     @Override
     public boolean equals(Object obj) {
         if (obj instanceof PublishmentType) {
@@ -85,4 +96,56 @@ public class PublishmentType {
             .append(fields)
             .build();
     }
-}
+
+
+    public static class Builder {
+        private final PublishmentType publishmentType;
+
+        public Builder() {
+            this.publishmentType = new PublishmentType();
+        }
+
+        public Builder type(Class<?> typeClass) {
+            this.publishmentType.setType(typeClass.getName());
+            return this;
+        }
+
+        public Builder name(String name) {
+            this.publishmentType.setName(name);
+            return this;
+        }
+
+        public Builder description(String description) {
+            this.publishmentType.setDescription(description);
+            return this;
+        }
+
+        public Builder field(Map<String,String> fieldDesc) {
+            this.publishmentType.getFields().add(fieldDesc);
+            return this;
+        }
+
+        public Builder field(String name, String value) {
+            this.publishmentType.getFields().add(new HashMap<String,String>() {
+                {
+                    put("name", name);
+                    put("value", value);
+                }
+            });
+            return this;
+        }
+
+        public Builder field(String name) {
+            this.publishmentType.getFields().add(new HashMap<String,String>() {
+                {
+                    put("name", name);
+                }
+            });
+            return this;
+        }
+
+        public PublishmentType build() {
+            return this.publishmentType;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
index 7acb4f7..77b3517 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
@@ -76,7 +76,7 @@ public class PolicyDefinitionTest {
         sp.setColumns(Arrays.asList("host"));
         sp.setType(StreamPartition.Type.GROUPBY);
         pd.addPartition(sp);
-        Assert.assertEquals("{name=\"policyName\",definition={type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }}", pd.toString());
+        Assert.assertEquals("{site=\"default\", name=\"policyName\",definition={type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }}", pd.toString());
 
         PolicyDefinition pd1 = new PolicyDefinition();
         PolicyDefinition.Definition def1 = new PolicyDefinition.Definition();

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPluginProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPluginProvider.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPluginProvider.java
new file mode 100644
index 0000000..77eea40
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPluginProvider.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
+
+public interface AlertPublishPluginProvider {
+    PublishmentType getPluginType();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoader.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoader.java
new file mode 100644
index 0000000..820d70e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
+import org.apache.eagle.common.utils.ReflectionsHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class PublishementTypeLoader {
+    private static final Logger LOGGER = LoggerFactory.getLogger(PublishementTypeLoader.class);
+
+    private final List<PublishmentType> publishmentTypeSet;
+
+    private PublishementTypeLoader() {
+        this.publishmentTypeSet = new LinkedList<>();
+        LOGGER.info("Loading alert publish plugins ...");
+        for (Class<? extends AlertPublishPluginProvider> clazz: ReflectionsHelper.getInstance().getSubTypesOf(AlertPublishPluginProvider.class)) {
+            LOGGER.debug("Loading alert publish plugin: {}", clazz);
+            try {
+                PublishmentType type = clazz.newInstance().getPluginType();
+                this.publishmentTypeSet.add(type);
+                LOGGER.info("Loaded alert publish plugin {}:{}", type.getName(), type.getType());
+            } catch (InstantiationException | IllegalAccessException e) {
+                LOGGER.error("Failed to get instantiate alert publish plugin provider: {}", clazz, e);
+            }
+        }
+        LOGGER.info("Loaded {} alert publish plugins", this.publishmentTypeSet.size());
+    }
+
+    private static final PublishementTypeLoader INSTANCE = new PublishementTypeLoader();
+
+    public static List<PublishmentType> loadPublishmentTypes() {
+        return INSTANCE.getPublishmentTypes();
+    }
+
+    public List<PublishmentType> getPublishmentTypes() {
+        return publishmentTypeSet;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
index 48c3663..b410cda 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
@@ -20,8 +20,10 @@ package org.apache.eagle.alert.engine.publisher.impl;
 
 import com.typesafe.config.Config;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
 import org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.apache.eagle.alert.service.MetadataServiceClientImpl;
@@ -34,7 +36,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class AlertEagleStorePlugin extends AbstractPublishPlugin {
+public class AlertEagleStorePlugin extends AbstractPublishPlugin implements AlertPublishPluginProvider {
 
     private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class);
     private transient IMetadataServiceClient client;
@@ -72,4 +74,12 @@ public class AlertEagleStorePlugin extends AbstractPublishPlugin {
         return LOG;
     }
 
+    @Override
+    public PublishmentType getPluginType() {
+        return new PublishmentType.Builder()
+                .name("JDBCStorage")
+                .type(getClass())
+                .description("Publish alerts into eagle metadata store")
+                .build();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index d08d114..152a9f1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -19,7 +19,9 @@
 package org.apache.eagle.alert.engine.publisher.impl;
 
 import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
 import org.apache.eagle.alert.engine.publisher.email.AlertEmailConstants;
 import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator;
@@ -41,7 +43,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.eagle.alert.service.MetadataServiceClientImpl.*;
 import static org.apache.eagle.common.mail.AlertEmailConstants.*;
 
-public class AlertEmailPublisher extends AbstractPublishPlugin {
+public class AlertEmailPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class);
     private static final int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
@@ -206,4 +208,16 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
     protected Logger getLogger() {
         return LOG;
     }
+
+    @Override
+    public PublishmentType getPluginType() {
+        return new PublishmentType.Builder()
+                .name("Email")
+                .type(AlertEmailPublisher.class)
+                .description("Email alert publisher")
+                .field("subject")
+                .field("sender")
+                .field("recipients")
+                .build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java
index 1848979..375a0da 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java
@@ -19,8 +19,10 @@
 package org.apache.eagle.alert.engine.publisher.impl;
 
 import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
 import org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
 import org.apache.eagle.common.DateTimeUtil;
 
@@ -33,7 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.logging.*;
 
-public class AlertFilePublisher extends AbstractPublishPlugin {
+public class AlertFilePublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
 
     private Logger filelogger = Logger.getLogger(AlertFilePublisher.class.getName());
     private FileHandler handler;
@@ -67,6 +69,15 @@ public class AlertFilePublisher extends AbstractPublishPlugin {
         filelogger.setUseParentHandlers(false);
     }
 
+    @Override
+    public PublishmentType getPluginType() {
+        return new PublishmentType.Builder()
+                .name("File")
+                .type(AlertFilePublisher.class)
+                .description("Local log file publisher")
+                .build();
+    }
+
     class AlertFileFormatter extends Formatter {
 
         @Override

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index e48f2eb..adac1aa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -26,7 +26,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -37,7 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import com.typesafe.config.Config;
 
-public class AlertKafkaPublisher extends AbstractPublishPlugin {
+public class AlertKafkaPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPublisher.class);
     private static final long MAX_TIMEOUT_MS = 60000;
@@ -181,4 +183,15 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
     protected Logger getLogger() {
         return LOG;
     }
-}
+
+    @Override
+    public PublishmentType getPluginType() {
+        return new PublishmentType.Builder()
+                .name("Kafka")
+                .type(getClass())
+                .description("Kafka alert publisher")
+                .field("kafka_broker","localhost:9092")
+                .field("topic")
+                .build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
index 6ce6ed7..0d60246 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
@@ -25,8 +25,10 @@ import com.ullink.slack.simpleslackapi.SlackSession;
 import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory;
 import org.apache.commons.lang.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +41,7 @@ import java.util.Map;
 /**
  * @since Sep 14, 2016.
  */
-public class AlertSlackPublisher extends AbstractPublishPlugin {
+public class AlertSlackPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
     private static final Logger LOG = LoggerFactory.getLogger(AlertSlackPublisher.class);
 
     private SlackSession session;
@@ -155,4 +157,17 @@ public class AlertSlackPublisher extends AbstractPublishPlugin {
         SlackChannel channel = session.findChannelByName(channelName);
         session.sendMessage(channel, message, attachment);
     }
+
+    @Override
+    public PublishmentType getPluginType() {
+        return new PublishmentType.Builder()
+                .name("Slack")
+                .type(getClass())
+                .description("Slack alert publisher")
+                .field("token")
+                .field("channels")
+                .field("severitys")
+                .field("urltemplate")
+                .build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java
new file mode 100644
index 0000000..3df5fc8
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher;
+
+import org.junit.Test;
+
+public class PublishementTypeLoaderTest {
+    @Test
+    public void testPublishmentTypeLoader() {
+        PublishementTypeLoader.loadPublishmentTypes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 617b4f0..2d30e85 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -16,6 +16,9 @@
  */
 package org.apache.eagle.service.metadata.resource;
 
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -25,19 +28,20 @@ import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter;
 import org.apache.eagle.alert.engine.interpreter.PolicyParseResult;
 import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult;
 import org.apache.eagle.alert.engine.model.AlertPublishEvent;
+import org.apache.eagle.alert.engine.publisher.PublishementTypeLoader;
 import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
 import org.apache.eagle.alert.metadata.resource.Models;
 import org.apache.eagle.alert.metadata.resource.OpResult;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
 import javax.validation.Valid;
 import javax.ws.rs.*;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * @since Apr 11, 2016.
@@ -135,6 +139,27 @@ public class MetadataResource {
         return dao.createStream(stream);
     }
 
+    @Path("/streams/create")
+    @POST
+    public OpResult createStream(StreamDefinitionWrapper stream) {
+        Preconditions.checkNotNull(stream.getStreamDefinition(),"Stream definition is null");
+        Preconditions.checkNotNull(stream.getStreamSource(),"Stream source is null");
+        stream.validateAndEnsureDefault();
+        OpResult createStreamResult = dao.createStream(stream.getStreamDefinition());
+        OpResult createDataSourceResult = dao.addDataSource(stream.getStreamSource());
+        // TODO: Check kafka topic exist or not.
+        if (createStreamResult.code == OpResult.SUCCESS
+                && createDataSourceResult.code == OpResult.SUCCESS) {
+            return OpResult.success("Successfully create stream "
+                    + stream.getStreamDefinition().getStreamId()
+                    + ", and datasource "
+                    + stream.getStreamSource().getName());
+        } else {
+            return OpResult.fail("Error: "
+                    + StringUtils.join(new String[]{createDataSourceResult.message, createDataSourceResult.message},","));
+        }
+    }
+
     @Path("/streams/batch")
     @POST
     public List<OpResult> addStreams(List<StreamDefinition> streams) {
@@ -201,8 +226,12 @@ public class MetadataResource {
 
     @Path("/policies")
     @GET
-    public List<PolicyDefinition> listPolicies() {
-        return dao.listPolicies();
+    public List<PolicyDefinition> listPolicies(@QueryParam("siteId") String siteId) {
+        if (siteId != null) {
+            return dao.getPoliciesBySiteId(siteId);
+        } else {
+            return dao.listPolicies();
+        }
     }
 
     @Path("/policies")
@@ -281,7 +310,7 @@ public class MetadataResource {
         try {
             PolicyDefinition policyDefinition = getPolicyById(policyId);
             policyDefinition.setPolicyStatus(status);
-            OpResult updateResult  = addPolicy(policyDefinition);
+            OpResult updateResult = addPolicy(policyDefinition);
             result.code = updateResult.code;
 
             if (result.code == OpResult.SUCCESS) {
@@ -292,7 +321,7 @@ public class MetadataResource {
                 LOG.error(result.message);
             }
         } catch (Exception e) {
-            LOG.error("Error: " + e.getMessage(),e);
+            LOG.error("Error: " + e.getMessage(), e);
             result.code = OpResult.FAILURE;
             result.message = e.getMessage();
         }
@@ -350,17 +379,19 @@ public class MetadataResource {
     @Path("/publishmentTypes")
     @GET
     public List<PublishmentType> listPublishmentType() {
-        return dao.listPublishmentType();
+        return PublishementTypeLoader.loadPublishmentTypes();
     }
 
     @Path("/publishmentTypes")
     @POST
+    @Deprecated
     public OpResult addPublishmentType(PublishmentType publishmentType) {
         return dao.addPublishmentType(publishmentType);
     }
 
     @Path("/publishmentTypes/batch")
     @POST
+    @Deprecated
     public List<OpResult> addPublishmentTypes(List<PublishmentType> publishmentTypes) {
         List<OpResult> results = new LinkedList<>();
         for (PublishmentType pubType : publishmentTypes) {
@@ -371,12 +402,14 @@ public class MetadataResource {
 
     @Path("/publishmentTypes/{name}")
     @DELETE
+    @Deprecated
     public OpResult removePublishmentType(@PathParam("name") String name) {
         return dao.removePublishmentType(name);
     }
 
     @Path("/publishmentTypes")
     @DELETE
+    @Deprecated
     public List<OpResult> removePublishmentTypes(List<String> pubTypes) {
         List<OpResult> results = new LinkedList<>();
         for (String pubType : pubTypes) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java
new file mode 100644
index 0000000..738c978
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.service.metadata.resource;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector;
+
+import java.util.Properties;
+
+public class StreamDefinitionWrapper {
+    private Kafka2TupleMetadata streamSource;
+    private StreamDefinition streamDefinition;
+
+    public Kafka2TupleMetadata getStreamSource() {
+        return streamSource;
+    }
+
+    public void setStreamSource(Kafka2TupleMetadata streamSource) {
+        this.streamSource = streamSource;
+    }
+
+    public StreamDefinition getStreamDefinition() {
+        return streamDefinition;
+    }
+
+    public void setStreamDefinition(StreamDefinition streamDefinition) {
+        this.streamDefinition = streamDefinition;
+    }
+
+    public void validateAndEnsureDefault() {
+        Preconditions.checkNotNull(streamSource);
+        Preconditions.checkNotNull(streamDefinition);
+        if (streamSource.getType() == null) {
+            streamSource.setType("KAFKA");
+        }
+        String dataSourceName = (getStreamDefinition().getStreamId() + "_CUSTOMIZED").toUpperCase();
+        getStreamDefinition().setDataSource(dataSourceName);
+        getStreamSource().setName(dataSourceName);
+        Tuple2StreamMetadata codec = new Tuple2StreamMetadata();
+        codec.setTimestampColumn("timestamp");
+        codec.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getName());
+        Properties streamNameSelectorProp = new Properties();
+        streamNameSelectorProp.put("userProvidedStreamName", streamSource.getName());
+        codec.setStreamNameSelectorProp(streamNameSelectorProp);
+        if (StringUtils.isBlank(codec.getStreamNameSelectorCls())) {
+            codec.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getName());
+        }
+        if (StringUtils.isBlank(codec.getTimestampFormat())) {
+            codec.setTimestampFormat(null);
+        }
+        this.streamSource.setCodec(codec);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
index 2dc7f51..2d2a90f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
@@ -75,10 +75,13 @@ public interface IMetadataDao extends Closeable {
 
     OpResult removePublishment(String pubId);
 
+    @Deprecated
     List<PublishmentType> listPublishmentType();
 
+    @Deprecated
     OpResult addPublishmentType(PublishmentType publishmentType);
 
+    @Deprecated
     OpResult removePublishmentType(String pubType);
 
     List<AlertPublishEvent> listAlertPublishEvent(int size);
@@ -190,4 +193,9 @@ public interface IMetadataDao extends Closeable {
         }
         return result;
     }
+
+    default List<PolicyDefinition> getPoliciesBySiteId(String siteId) {
+        Preconditions.checkNotNull(siteId,"siteId");
+        return listPolicies().stream().filter(pc -> pc.getSiteId().equals(siteId)).collect(Collectors.toList());
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index f61a291..2b4180d 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -186,9 +186,10 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
                     } else if (topologySummary.get_status().equalsIgnoreCase("INACTIVE")) {
                         status = ApplicationEntity.Status.STOPPED;
                     } else if (topologySummary.get_status().equalsIgnoreCase("KILLED")) {
-                        status = ApplicationEntity.Status.STOPPED;
+                        status = ApplicationEntity.Status.STOPPING;
                     } else {
                         LOG.error("Unknown storm topology ({}) status: {}", topologySummary.get_status(),topologySummary.get_status());
+                        status = ApplicationEntity.Status.UNKNOWN;
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java
index 02c3a5e..b5bec1b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java
@@ -71,51 +71,60 @@ public class ApplicationStatusUpdateServiceImpl extends ApplicationStatusUpdateS
     }
 
     @Override
-    public void updateApplicationEntityStatus(Collection<ApplicationEntity> applicationEntities) {
-    }
-
-    @Override
     public void updateApplicationEntityStatus(ApplicationEntity applicationEntity) {
         String appUuid = applicationEntity.getUuid();
-        ApplicationEntity.Status currentStatus = applicationEntity.getStatus();
+        ApplicationEntity.Status preStatus = applicationEntity.getStatus();
         try {
-            ApplicationEntity.Status topologyStatus = applicationManagementService.getStatus(new ApplicationOperations.CheckStatusOperation(appUuid));
-            if (currentStatus == ApplicationEntity.Status.STARTING) {
-                if (topologyStatus == ApplicationEntity.Status.RUNNING) {
-                    applicationEntityService.delete(applicationEntity);
-                    applicationEntity.setStatus(ApplicationEntity.Status.RUNNING);
-                    applicationEntityService.create(applicationEntity);
+            ApplicationEntity.Status currentStatus = applicationManagementService.getStatus(new ApplicationOperations.CheckStatusOperation(appUuid));
+            if (preStatus == ApplicationEntity.Status.STARTING) {
+                if (currentStatus == ApplicationEntity.Status.RUNNING) {
+                    // applicationEntityService.delete(applicationEntity);
+                    // applicationEntity.setStatus(ApplicationEntity.Status.RUNNING);
+                    // applicationEntityService.create(applicationEntity);
+                    currentStatus = ApplicationEntity.Status.RUNNING;
                     // handle the topology corruption case:
-                } else if (topologyStatus == ApplicationEntity.Status.REMOVED) {
-                    applicationEntityService.delete(applicationEntity);
-                    applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED);
-                    applicationEntityService.create(applicationEntity);
+                } else if (currentStatus == ApplicationEntity.Status.REMOVED) {
+                    // applicationEntityService.delete(applicationEntity);
+                    // applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED);
+                    // applicationEntityService.create(applicationEntity);
+                    currentStatus = ApplicationEntity.Status.INITIALIZED;
                 }
-            } else if (currentStatus == ApplicationEntity.Status.STOPPING) {
-                if (topologyStatus == ApplicationEntity.Status.REMOVED) {
-                    applicationEntityService.delete(applicationEntity);
-                    applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED);
-                    applicationEntityService.create(applicationEntity);
+            } else if (preStatus == ApplicationEntity.Status.STOPPING) {
+                if (currentStatus == ApplicationEntity.Status.REMOVED) {
+                    // applicationEntityService.delete(applicationEntity);
+                    // applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED);
+                    // applicationEntityService.create(applicationEntity);
+                    currentStatus = ApplicationEntity.Status.INITIALIZED;
                 }
-            } else if (currentStatus == ApplicationEntity.Status.RUNNING) {
+            } else if (preStatus == ApplicationEntity.Status.RUNNING) {
                 // handle the topology corruption case:
-                if (topologyStatus == ApplicationEntity.Status.REMOVED) {
-                    applicationEntityService.delete(applicationEntity);
-                    applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED);
-                    applicationEntityService.create(applicationEntity);
+                if (currentStatus == ApplicationEntity.Status.REMOVED) {
+                    // applicationEntityService.delete(applicationEntity);
+                    // applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED);
+                    // applicationEntityService.create(applicationEntity);
+                    currentStatus = ApplicationEntity.Status.INITIALIZED;
                 }
-            } else if (currentStatus == ApplicationEntity.Status.INITIALIZED) {
+            } else if (preStatus == ApplicationEntity.Status.INITIALIZED) {
                 //corner case: when Storm service go down, app status-> initialized,
                 //then when storm server is up again, storm topology will be launched automatically->active
-                if (topologyStatus == ApplicationEntity.Status.RUNNING) {
-                    applicationEntityService.delete(applicationEntity);
-                    applicationEntity.setStatus(ApplicationEntity.Status.RUNNING);
-                    applicationEntityService.create(applicationEntity);
+                if (currentStatus == ApplicationEntity.Status.RUNNING) {
+                    // applicationEntityService.delete(applicationEntity);
+                    // applicationEntity.setStatus(ApplicationEntity.Status.RUNNING);
+                    // applicationEntityService.create(applicationEntity);
+                    currentStatus = ApplicationEntity.Status.RUNNING;
                 }
             }
-            // "STOPPED" is not used in Eagle, so just do nothing.
 
-            applicationEntity.setStatus(topologyStatus);
+            if (currentStatus == ApplicationEntity.Status.REMOVED) {
+                currentStatus = ApplicationEntity.Status.INITIALIZED;
+            }
+
+            // "STOPPED" is not used in Eagle, so just do nothing.
+            if (preStatus != currentStatus) {
+                LOG.info("Application {} status changed from {} to {}", applicationEntity.getAppId(), preStatus, currentStatus);
+            }
+            applicationEntity.setStatus(currentStatus);
+            applicationEntityService.update(applicationEntity);
         } catch (RuntimeException e) {
             LOG.error(e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
index eff232a..0172498 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Optional;
 
 /**
- * Application Service KafkaStreamMessaging Interface.
+ * Application Service Provider Interface (SPI)
  *
  * @param <T> Application Type.
  */

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
index 1b066ef..a5f5a73 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
@@ -79,7 +79,8 @@ public class ApplicationSimulatorImpl extends ApplicationSimulator {
         while (attempt < 10) {
             attempt++;
             statusUpdateService.updateApplicationEntityStatus(applicationEntity);
-            if (applicationEntity.getStatus() == ApplicationEntity.Status.STOPPED) {
+            if (applicationEntity.getStatus() == ApplicationEntity.Status.STOPPED
+                    || applicationEntity.getStatus() == ApplicationEntity.Status.INITIALIZED) {
                 break;
             } else {
                 try {

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
index 6bc73fc..52b8e79 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
@@ -17,12 +17,20 @@
 package org.apache.eagle.app.test;
 
 import com.google.inject.Guice;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.service.ApplicationStatusUpdateService;
+import org.junit.Assert;
 import org.junit.Before;
 
 public class ApplicationTestBase {
     private Injector injector;
 
+
+    @Inject
+    ApplicationStatusUpdateService statusUpdateService;
+
     @Before
     public void setUp() {
         injector = Guice.createInjector(new ApplicationTestGuiceModule());
@@ -32,4 +40,21 @@ public class ApplicationTestBase {
     protected Injector injector() {
         return injector;
     }
+
+    protected void awaitApplicationStop(ApplicationEntity applicationEntity) throws InterruptedException {
+        int attempt = 0;
+        while (attempt < 10) {
+            attempt ++;
+            if (applicationEntity.getStatus() == ApplicationEntity.Status.STOPPED
+                    || applicationEntity.getStatus() == ApplicationEntity.Status.INITIALIZED) {
+                break;
+            } else {
+                statusUpdateService.updateApplicationEntityStatus(applicationEntity);
+                Thread.sleep(1000);
+            }
+        }
+        if (attempt > 10) {
+            Assert.fail("Failed to wait for application to STOPPED after 10 attempts");
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java
index 59925fd..6c68cd2 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java
@@ -59,7 +59,7 @@ public class ApplicationResourceTest extends ApplicationTestBase {
         statusUpdateService.updateApplicationEntityStatus(applicationEntity);
         // Stop application
         applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
-        statusUpdateService.updateApplicationEntityStatus(applicationEntity);
+        awaitApplicationStop(applicationEntity);
         // Uninstall application
         applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
         try {

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/pom.xml b/eagle-core/eagle-common/pom.xml
index 6ab250e..2b72f44 100644
--- a/eagle-core/eagle-common/pom.xml
+++ b/eagle-core/eagle-common/pom.xml
@@ -105,6 +105,10 @@
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.reflections</groupId>
+            <artifactId>reflections</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/ReflectionsHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/ReflectionsHelper.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/ReflectionsHelper.java
new file mode 100644
index 0000000..facf07a
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/ReflectionsHelper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.common.utils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReflectionsHelper {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReflectionsHelper.class);
+    private final Reflections reflections;
+    private static final String DEFAULT_PACKAGE = "org.apache.eagle";
+
+    private ReflectionsHelper() {
+        Config config = ConfigFactory.load();
+        String[] packages;
+        if (config.hasPath("scanPackages")) {
+            packages = config.getString("scanPackages").split(",");
+        } else {
+            packages = new String[]{DEFAULT_PACKAGE};
+        }
+        LOGGER.info("Scanning packages: {}", packages);
+        this.reflections = new Reflections(packages);
+    }
+
+    private static ReflectionsHelper INSTANCE = new ReflectionsHelper();
+
+    public static Reflections getInstance() {
+        return INSTANCE.reflections;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
index 84661e9..0aeac2c 100644
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
@@ -23,6 +23,9 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
+@Deprecated
 public class EmbeddedHbase {
     private HBaseTestingUtility util;
     private MiniHBaseCluster hbaseCluster;
@@ -58,7 +61,7 @@ public class EmbeddedHbase {
         return getInstance(null);
     }
 
-    private EmbeddedHbase() {
+    public EmbeddedHbase() {
         this(DEFAULT_PORT, DEFAULT_ZNODE);
     }
 
@@ -115,7 +118,7 @@ public class EmbeddedHbase {
     public void createTable(String tableName, String cf) {
         try {
             util.createTable(tableName, cf);
-        } catch (Exception ex) {
+        } catch (IOException ex) {
             LOG.warn("Create table failed, probably table already existed, table name: " + tableName);
         }
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java
index e65f062..ee9d32d 100644
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java
@@ -18,9 +18,11 @@ package org.apache.eagle.service.hbase;
 
 import org.junit.Test;
 
+import java.io.IOException;
+
 public class EmbeddedHbaseTest extends TestHBaseBase {
     @Test
-    public void testHBaseCreateTable() {
-        // hbase.createTable("test_hbase_table","f");
+    public void testHBaseCreateTable() throws IOException {
+        hbase.createTable("test_hbase_table","f");
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
index 31af2a1..35c0a38 100644
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
@@ -17,30 +17,48 @@
 package org.apache.eagle.service.hbase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.junit.AfterClass;
-import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-
-@Ignore
 public class TestHBaseBase {
-    protected static EmbeddedHbase hbase;
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestHBaseBase.class);
+    protected static HBaseTestingUtility hbase;
 
-    @BeforeClass
-    public static void setUpHBase() {
-        hbase = EmbeddedHbase.getInstance();
+    protected static String getZkZnodeParent() {
+        return "/hbase-test";
     }
 
-    public static void setupHBaseWithConfig(Configuration config) {
-        Assert.assertTrue("HBase test mini cluster should not start", null == hbase);
-        hbase = EmbeddedHbase.getInstance(config);
+    @BeforeClass
+    public static void setUpHBase() {
+        Configuration configuration = HBaseConfiguration.create();
+        configuration.set("zookeeper.znode.parent", getZkZnodeParent());
+        configuration.setInt("hbase.master.info.port", -1);//avoid port clobbering
+        configuration.setInt("hbase.regionserver.info.port", -1);//avoid port clobbering
+        hbase = new HBaseTestingUtility(configuration);
+        try {
+            hbase.startMiniCluster();
+        } catch (Exception e) {
+            LOGGER.error("Error to start hbase mini cluster: " + e.getMessage(), e);
+            throw new IllegalStateException(e);
+        }
+        System.setProperty("storage.hbase.autoCreateTable","false");
+        System.setProperty("storage.hbase.zookeeperZnodeParent", getZkZnodeParent());
+        System.setProperty("storage.hbase.zookeeperPropertyClientPort", String.valueOf(hbase.getZkCluster().getClientPort()));
     }
 
     @AfterClass
     public static void shutdownHBase() {
-        if (hbase != null) {
-            hbase.shutdown();
+        try {
+            hbase.shutdownMiniCluster();
+        } catch (Exception e) {
+            LOGGER.error("Error to shutdown mini hbase cluster: " + e.getMessage(),e);
+        } finally {
+            hbase = null;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java
index 66772ac..d725614 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java
@@ -22,7 +22,5 @@ import org.apache.eagle.metadata.model.ApplicationEntity;
 import java.util.Collection;
 
 public abstract class ApplicationStatusUpdateService extends AbstractScheduledService {
-    public abstract void updateApplicationEntityStatus(Collection<ApplicationEntity> applicationEntities);
-
     public abstract void updateApplicationEntityStatus(ApplicationEntity applicationEntity);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
index 0411b90..7c79b39 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
@@ -19,6 +19,7 @@ package org.apache.eagle.service.client.impl;
 import com.sun.jersey.api.client.WebResource;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.client.EagleServiceClientException;
@@ -65,11 +66,13 @@ public class EagleServiceClientImpl extends EagleServiceBaseClient {
             try {
                 return config.getInt(SERVICE_PORT_KEY);
             } catch (ConfigException.WrongType wrongType) {
-                return Integer.valueOf(config.getString(SERVICE_PORT_KEY));
+                String portStr = config.getString(SERVICE_PORT_KEY);
+                if (StringUtils.isNotBlank(portStr)) {
+                    return Integer.valueOf(portStr);
+                }
             }
-        } else {
-            return 9090;
         }
+        return 9090;
     }
 
     public EagleServiceClientImpl(String host, int port, String username, String password) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java b/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java
index ac16b93..adfd2e2 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java
@@ -19,7 +19,7 @@ package org.apache.eagle.service.client;
 import org.apache.eagle.service.hbase.EmbeddedHbase;
 
 public class ClientTestBase {
-	
+
 	//protected static EmbeddedServer server;
 	protected static EmbeddedHbase hbase;
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-entity-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/pom.xml b/eagle-core/eagle-query/eagle-entity-base/pom.xml
index f887714..fd2300c 100755
--- a/eagle-core/eagle-query/eagle-entity-base/pom.xml
+++ b/eagle-core/eagle-query/eagle-entity-base/pom.xml
@@ -31,20 +31,20 @@
 
     <dependencies>
         <!-- put extcos dependency at the top for using asm 4.0 jar !-->
-        <dependency>
-            <groupId>net.sf.extcos</groupId>
-            <artifactId>extcos</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.ow2.asm</groupId>
-                    <artifactId>asm-all</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm-all</artifactId>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>net.sf.extcos</groupId>-->
+            <!--<artifactId>extcos</artifactId>-->
+            <!--<exclusions>-->
+                <!--<exclusion>-->
+                    <!--<groupId>org.ow2.asm</groupId>-->
+                    <!--<artifactId>asm-all</artifactId>-->
+                <!--</exclusion>-->
+            <!--</exclusions>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+            <!--<groupId>org.ow2.asm</groupId>-->
+            <!--<artifactId>asm-all</artifactId>-->
+        <!--</dependency>-->
         <dependency>
             <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-common</artifactId>

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
index 7065cbe..8ccee87 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
@@ -16,55 +16,67 @@
  */
 package org.apache.eagle.log.entity.repo;
 
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.eagle.common.utils.ReflectionsHelper;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.EntitySerDeser;
-import net.sf.extcos.ComponentQuery;
-import net.sf.extcos.ComponentScanner;
-
 import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.log.entity.meta.EntitySerDeser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.Map;
+
 public final class EntityRepositoryScanner {
 
-	private static final Logger LOG = LoggerFactory.getLogger(EntityRepositoryScanner.class);
+    private static final Logger LOG = LoggerFactory.getLogger(EntityRepositoryScanner.class);
 
-	public static void scan() throws InstantiationException, IllegalAccessException {
-		// TODO currently extcos 0.3b doesn't support to search packages like "com.*.eagle.*", "org.*.eagle.*". However 0.4b depends on asm-all version 4.0, which is 
-		// conflicted with jersey server 1.8. We should fix it later
-		LOG.info("Scanning all entity repositories with pattern \"org.apache.eagle.*\"");
-		final ComponentScanner scanner = new ComponentScanner();
-		final Set<Class<?>> classes = scanner.getClasses(new EntityRepoScanQuery() );
-		for (Class<?> entityClass : classes) {
-			LOG.info("Processing entity repository: " + entityClass.getName());
-			if (EntityRepository.class.isAssignableFrom(entityClass)) {
-				EntityRepository repo = (EntityRepository)entityClass.newInstance();
-				addRepo(repo);
-			}
-		}
-	}
+    //    public static void scan() throws InstantiationException, IllegalAccessException {
+    //        // TODO currently extcos 0.3b doesn't support to search packages like "com.*.eagle.*", "org.*.eagle.*". However 0.4b depends on asm-all version 4.0, which is
+    //        // conflicted with jersey server 1.8. We should fix it later
+    //        LOG.info("Scanning all entity repositories with pattern \"org.apache.eagle.*\"");
+    //        final ComponentScanner scanner = new ComponentScanner();
+    //        final Set<Class<?>> classes = scanner.getClasses(new EntityRepoScanQuery() );
+    //        for (Class<?> entityClass : classes) {
+    //            LOG.info("Processing entity repository: " + entityClass.getName());
+    //            if (EntityRepository.class.isAssignableFrom(entityClass)) {
+    //                EntityRepository repo = (EntityRepository)entityClass.newInstance();
+    //                addRepo(repo);
+    //            }
+    //        }
+    //    }
 
-	private static void addRepo(EntityRepository repo) {
-		final Map<Class<?>, EntitySerDeser<?>> serDeserMap = repo.getSerDeserMap();
-		for (Map.Entry<Class<?>, EntitySerDeser<?>> entry : serDeserMap.entrySet()) {
-			EntityDefinitionManager.registerSerDeser(entry.getKey(), entry.getValue());
-		}
-		final Collection<Class<? extends TaggedLogAPIEntity>> entityClasses = repo.getEntitySet();
-		for (Class<? extends TaggedLogAPIEntity> clazz : entityClasses) {
-			EntityDefinitionManager.registerEntity(clazz);
-		}
-	}
+    public static void scan() throws IllegalAccessException, InstantiationException {
+        LOG.info("Scanning all entity repositories");
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        for (Class<? extends EntityRepository> entityRepoClass : ReflectionsHelper.getInstance().getSubTypesOf(EntityRepository.class)) {
+            if (EntityRepository.class.isAssignableFrom(entityRepoClass)) {
+                EntityRepository repo = entityRepoClass.newInstance();
+                addRepo(repo);
+            }
+        }
+        stopWatch.stop();
+        LOG.info("Finished scanning entity repositories in {} ms", stopWatch.getTime());
+    }
 
-	public static class EntityRepoScanQuery extends ComponentQuery {
+    private static void addRepo(EntityRepository repo) {
+        final Map<Class<?>, EntitySerDeser<?>> serDeserMap = repo.getSerDeserMap();
+        for (Map.Entry<Class<?>, EntitySerDeser<?>> entry : serDeserMap.entrySet()) {
+            EntityDefinitionManager.registerSerDeser(entry.getKey(), entry.getValue());
+        }
+        final Collection<Class<? extends TaggedLogAPIEntity>> entityClasses = repo.getEntitySet();
+        for (Class<? extends TaggedLogAPIEntity> clazz : entityClasses) {
+            EntityDefinitionManager.registerEntity(clazz);
+        }
+    }
 
-		@Override
-		protected void query() {
-			select().from("org.apache.eagle").returning(
-			allExtending(EntityRepository.class));
-		}
-	}
+    //    public static class EntityRepoScanQuery extends ComponentQuery {
+    //
+    //        @Override
+    //        protected void query() {
+    //            select().from("org.apache.eagle").returning(
+    //            allExtending(EntityRepository.class));
+    //        }
+    //   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java
index 1e9e6cb..33aee32 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java
@@ -26,19 +26,24 @@ import org.apache.eagle.log.entity.test.TestLogAPIEntity;
 import org.apache.eagle.query.parser.EagleQueryParser;
 import org.apache.eagle.service.hbase.TestHBaseBase;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
 public class TestGenericEntityIndexStreamReader extends TestHBaseBase {
 
-    @Test
-    public void testUniqueIndexRead() throws Exception {
+    @BeforeClass
+    public static void createTable() throws IOException, IllegalAccessException, InstantiationException {
         EntityDefinition entityDefinition = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
         hbase.createTable(entityDefinition.getTable(), entityDefinition.getColumnFamily());
+    }
 
+    @Test
+    public void testUniqueIndexRead() throws Exception {
         EntityDefinitionManager.registerEntity(TestLogAPIEntity.class);
         final EntityDefinition ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
         
@@ -95,7 +100,6 @@ public class TestGenericEntityIndexStreamReader extends TestHBaseBase {
         indexReader = new UniqueIndexStreamReader(indexDef, condition);
         batchReader = new GenericEntityBatchReader(indexReader);
         entities =  batchReader.read();
-        hbase.deleteTable(entityDefinition.getTable());
         Assert.assertNotNull(entities);
         Assert.assertTrue(entities.isEmpty());
     }
@@ -103,7 +107,7 @@ public class TestGenericEntityIndexStreamReader extends TestHBaseBase {
     @Test
     public void testNonClusterIndexRead() throws Exception {
         EntityDefinition entityDefinition = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
-        hbase.createTable(entityDefinition.getTable(), entityDefinition.getColumnFamily());
+        // hbase.createTable(entityDefinition.getTable(), entityDefinition.getColumnFamily());
 
         EntityDefinitionManager.registerEntity(TestLogAPIEntity.class);
         final EntityDefinition ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
@@ -162,7 +166,7 @@ public class TestGenericEntityIndexStreamReader extends TestHBaseBase {
         indexReader = new NonClusteredIndexStreamReader(indexDef, condition);
         batchReader = new GenericEntityBatchReader(indexReader);
         entities =  batchReader.read();
-        hbase.deleteTable(entityDefinition.getTable());
+        // hbase.deleteTable(entityDefinition.getTable());
         Assert.assertNotNull(entities);
         Assert.assertTrue(entities.isEmpty());
     }