You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/12/02 10:36:22 UTC

[2/2] incubator-eagle git commit: [EAGLE-811] Refactor jdbcMetadataDaoImpl of alert engine metadata

[EAGLE-811] Refactor jdbcMetadataDaoImpl of alert engine metadata

* Tickets
https://issues.apache.org/jira/browse/EAGLE-811
https://issues.apache.org/jira/browse/EAGLE-808

* fix a bug 'alertId is null' in email publisher
* improve UnitAlertApplication config

Author: Zhao, Qingwen <qi...@apache.org>

Closes #705 from qingwen220/EAGLE-811.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/30e35de6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/30e35de6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/30e35de6

Branch: refs/heads/master
Commit: 30e35de60c532502731ef5b51360df15c0026397
Parents: aef7ea3
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Fri Dec 2 18:36:14 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Fri Dec 2 18:36:14 2016 +0800

----------------------------------------------------------------------
 ...e.alert.app.AlertUnitTopologyAppProvider.xml |  47 +-
 .../src/test/resource/application.conf          |   9 +-
 .../alert/engine/model/AlertStreamEvent.java    |   1 +
 .../eagle/alert/utils/AlertConstants.java       |   2 +
 .../src/test/resources/application.conf         |   9 +-
 .../eagle/alert/engine/StreamContext.java       |  16 +
 .../publisher/email/AlertEmailGenerator.java    |   7 +-
 .../email/AlertEmailGeneratorBuilder.java       |   4 +-
 .../publisher/email/AlertEmailSender.java       |  44 +-
 .../publisher/impl/AlertEmailPublisher.java     |  47 +-
 .../src/main/resources/application.conf         |   9 +-
 .../publisher/AlertEmailPublisherTest.java      |   4 +-
 .../src/test/resources/application-test.conf    |  31 +-
 .../metadata/resource/MetadataResource.java     |  76 +--
 .../eagle/alert/metadata/IMetadataDao.java      |  77 ++-
 .../metadata/impl/InMemMetadataDaoImpl.java     |   2 +-
 .../metadata/impl/JdbcDatabaseHandler.java      | 399 ---------------
 .../metadata/impl/JdbcMetadataDaoImpl.java      |  61 ++-
 .../metadata/impl/JdbcMetadataHandler.java      | 506 +++++++++++++++++++
 .../metadata/impl/MongoMetadataDaoImpl.java     |   2 +-
 .../eagle/alert/metadata/impl/InMemoryTest.java |   2 +-
 .../eagle/alert/metadata/impl/JdbcImplTest.java | 103 +++-
 .../alert-metadata/src/test/resources/init.sql  |  79 +--
 .../src/main/resources/application.conf         |   9 +-
 .../eagle/app/service/ApplicationAction.java    |  13 +-
 .../app/service/ApplicationActionTest.java      |   9 +-
 .../src/test/resources/application.conf         |   9 +-
 .../eagle/common/TestSerializableUtils.java     |   6 +-
 .../eagle/metadata/model/AlertEntity.java       |   1 -
 .../src/test/resources/application-test.xml     |   1 +
 .../src/main/bin/createTables.sql               |  78 ++-
 eagle-server-assembly/src/main/conf/eagle.conf  |   7 +
 .../src/main/resources/application.conf         |   9 +-
 .../app/dev/public/js/services/policySrv.js     |   4 +-
 34 files changed, 1022 insertions(+), 661 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
index 8ee8b6b..74e97d3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
@@ -76,76 +76,39 @@
         <!-- alert spout configuration -->
         <property>
             <name>spout.kafkaBrokerZkQuorum</name>
-            <displayName>Kafka Zookeeper Quorum</displayName>
+            <displayName>Kafka Spout Broker Zookeeper Quorum</displayName>
             <value>localhost:2181</value>
             <description>Zookeeper quorum of kafka broker for spout to consume data</description>
             <required>true</required>
         </property>
         <property>
             <name>spout.kafkaBrokerZkBasePath</name>
-            <displayName>Kafka Zookeeper Root</displayName>
+            <displayName>Kafka Spout Broker Zookeeper Root</displayName>
             <value>/brokers</value>
             <description>Zookeeper znode path for kafka brokers</description>
             <required>false</required>
         </property>
         <property>
             <name>spout.stormKafkaUseSameZkQuorumWithKafkaBroker</name>
-            <displayName>Reuse Kafka Zookeeper</displayName>
+            <displayName>Spout Transaction Zookeeper to Reuse Broker Zookeeper</displayName>
             <value>true</value>
             <description>Use same zookeeper for kafka server and kafka consumer(Storm-Kafka)</description>
             <required>false</required>
         </property>
         <property>
             <name>spout.stormKafkaTransactionZkPath</name>
-            <displayName>Kafka Transaction ZkPath</displayName>
+            <displayName>Spout Transaction Zookeeper Path</displayName>
             <value>/consumers</value>
             <description>Zookeeper path for storm kafka transaction</description>
             <required>false</required>
         </property>
         <property>
             <name>spout.stormKafkaEagleConsumer</name>
-            <displayName>Kafka Consumer ID</displayName>
+            <displayName>Spout Consumer ID</displayName>
             <value>eagle_consumer</value>
             <description>Zookeeper quorum for spout to consume data</description>
             <required>true</required>
         </property>
-
-        <!-- zk config for alert engine -->
-        <property>
-            <name>zkConfig.zkQuorum</name>
-            <displayName>Coordinator Zookeeper Quorum</displayName>
-            <value>localhost:2181</value>
-            <description>Zookeeper quorum for alert engine</description>
-            <required>true</required>
-        </property>
-        <property>
-            <name>zkConfig.zkRoot</name>
-            <displayName>Coordinator Zookeeper Root</displayName>
-            <value>/alert</value>
-            <description>Zookeeper znode path for alert engine</description>
-            <required>false</required>
-        </property>
-        <property>
-            <name>metadataService.context</name>
-            <displayName>Metadata Service Context Path</displayName>
-            <value>/rest</value>
-            <description>Metadata service context path</description>
-            <required>false</required>
-        </property>
-        <property>
-            <name>metadataService.host</name>
-            <displayName>Metadata Service Host</displayName>
-            <value>localhost</value>
-            <description>Metadata service host</description>
-            <required>true</required>
-        </property>
-        <property>
-            <name>metadataService.port</name>
-            <displayName>Metadata Service Port</displayName>
-            <value>9090</value>
-            <description>Metadata service port</description>
-            <required>true</required>
-        </property>
     </configuration>
     <docs>
         <install>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/resource/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/resource/application.conf b/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/resource/application.conf
index cfb2d47..6c15b1b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/resource/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/resource/application.conf
@@ -46,7 +46,14 @@
     "metadataService": {
       "host": "localhost",
       "port": 8080,
-      "context": "/rest"
+      "context": "/rest",
+      mailSmtpServer = "",
+      mailSmtpPort = 25,
+      mailSmtpAuth = "false"
+      //mailSmtpConn = "plaintext",
+      //mailSmtpUsername = ""
+      //mailSmtpPassword = ""
+      //mailSmtpDebug = false
     }
     "metadataDynamicCheck": {
       "initDelayMillis": 1000,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index c0a709d..b7f0132 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -123,6 +123,7 @@ public class AlertStreamEvent extends StreamEvent {
     }
 
     public String getAlertId() {
+        ensureAlertId();
         return alertId;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
index 566944f..ee2c28c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
@@ -26,4 +26,6 @@ public class AlertConstants {
     public static final String DEFAULT_ROUTERBOLT_NAME = "streamRouterBolt";
 
     public static final String ALERT_SERVICE_ENDPOINT_NAME = "AlertService";
+
+    public static final String COORDINATOR = "coordinator";
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
index 72b8012..5d4da38 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
@@ -32,7 +32,14 @@
     "metadataService": {
       "host": "localhost",
       "port": 8080,
-      "context": "/rest"
+      "context": "/rest",
+      mailSmtpServer = "localhost",
+      mailSmtpPort = 25,
+      mailSmtpAuth = "false"
+      //mailSmtpConn = "plaintext",
+      //mailSmtpUsername = ""
+      //mailSmtpPassword = ""
+      //mailSmtpDebug = false
     }
     "metadataDynamicCheck": {
       "initDelayMillis": 1000,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
index bafba83..c2d5f2e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.eagle.alert.engine;
 
 import com.typesafe.config.Config;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
index 8a69c37..809bb09 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.*;
 
 public class AlertEmailGenerator {
@@ -43,7 +44,7 @@ public class AlertEmailGenerator {
     private String serverHost = "localhost";
     private int serverPort = 80;
 
-    private Map<String, Object> properties;
+    private Properties properties;
 
     private ThreadPoolExecutor executorPool;
 
@@ -173,11 +174,11 @@ public class AlertEmailGenerator {
         this.subject = subject;
     }
 
-    public Map<String, Object> getProperties() {
+    public Properties getProperties() {
         return properties;
     }
 
-    public void setProperties(Map<String, Object> properties) {
+    public void setProperties(Properties properties) {
         this.properties = properties;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
index b018d5c..f6debab 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
@@ -17,7 +17,7 @@
  */
 package org.apache.eagle.alert.engine.publisher.email;
 
-import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.ThreadPoolExecutor;
 
 public class AlertEmailGeneratorBuilder {
@@ -51,7 +51,7 @@ public class AlertEmailGeneratorBuilder {
         return this;
     }
 
-    public AlertEmailGeneratorBuilder withMailProps(Map<String, Object> mailProps) {
+    public AlertEmailGeneratorBuilder withMailProps(Properties mailProps) {
         generator.setProperties(mailProps);
         return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
index 1152d24..f573215 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
@@ -43,10 +43,7 @@ public class AlertEmailSender implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class);
     private static final int MAX_RETRY_COUNT = 3;
 
-
-    private Map<String, Object> mailProps;
-
-
+    private Properties mailProps;
     private String threadName;
 
     /**
@@ -73,45 +70,11 @@ public class AlertEmailSender implements Runnable {
         LOG.info("Initialized " + threadName + ": origin is : " + this.origin + ", recipient of the email: " + this.recipients + ", velocity TPL file: " + this.configFileName);
     }
 
-    public AlertEmailSender(AlertEmailContext alertEmail, Map<String, Object> mailProps) {
+    public AlertEmailSender(AlertEmailContext alertEmail, Properties mailProps) {
         this(alertEmail);
         this.mailProps = mailProps;
     }
 
-    private Properties parseMailClientConfig(Map<String, Object> mailProps) {
-        if (mailProps == null) {
-            return null;
-        }
-        Properties props = new Properties();
-        String mailHost = (String) mailProps.get(AlertEmailConstants.CONF_MAIL_HOST);
-        String mailPort = (String) mailProps.get(AlertEmailConstants.CONF_MAIL_PORT);
-        if (mailHost == null || mailPort == null || mailHost.isEmpty()) {
-            LOG.warn("SMTP server is unset, will exit");
-            return null;
-        }
-        props.put(AlertEmailConstants.CONF_MAIL_HOST, mailHost);
-        props.put(AlertEmailConstants.CONF_MAIL_PORT, mailPort);
-
-        String smtpAuth = (String) mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_AUTH, "false");
-        props.put(AlertEmailConstants.CONF_MAIL_AUTH, smtpAuth);
-        if (Boolean.parseBoolean(smtpAuth)) {
-            props.put(AlertEmailConstants.CONF_AUTH_USER, mailProps.get(AlertEmailConstants.CONF_AUTH_USER));
-            props.put(AlertEmailConstants.CONF_AUTH_PASSWORD, mailProps.get(AlertEmailConstants.CONF_AUTH_PASSWORD));
-        }
-
-        String smtpConn = (String) mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_CONN, AlertEmailConstants.CONN_PLAINTEXT);
-        if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_TLS)) {
-            props.put("mail.smtp.starttls.enable", "true");
-        }
-        if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_SSL)) {
-            props.put("mail.smtp.socketFactory.port", "465");
-            props.put("mail.smtp.socketFactory.class",
-                "javax.net.ssl.SSLSocketFactory");
-        }
-        props.put(AlertEmailConstants.CONF_MAIL_DEBUG, mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_DEBUG, "false"));
-        return props;
-    }
-
     @Override
     public void run() {
         int count = 0;
@@ -121,8 +84,7 @@ public class AlertEmailSender implements Runnable {
             try {
                 final EagleMailClient client;
                 if (mailProps != null) {
-                    Properties props = parseMailClientConfig(mailProps);
-                    client = new EagleMailClient(props);
+                    client = new EagleMailClient(mailProps);
                 } else {
                     client = new EagleMailClient();
                 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index 40237ee..7431d35 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -21,6 +21,7 @@ package org.apache.eagle.alert.engine.publisher.impl;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
+import org.apache.eagle.alert.engine.publisher.email.AlertEmailConstants;
 import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator;
 import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder;
 import com.typesafe.config.Config;
@@ -32,10 +33,13 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.eagle.alert.service.MetadataServiceClientImpl.*;
+
 public class AlertEmailPublisher extends AbstractPublishPlugin {
 
     private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class);
@@ -43,12 +47,21 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
     private static final int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
     private static final long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
 
+    private static final String EAGLE_CORRELATION_SMTP_SERVER = "metadataService.mailSmtpServer";
+    private static final String EAGLE_CORRELATION_SMTP_PORT = "metadataService.mailSmtpPort";
+    private static final String EAGLE_CORRELATION_SMTP_CONN = "metadataService.mailSmtpConn";
+    private static final String EAGLE_CORRELATION_SMTP_AUTH = "metadataService.mailSmtpAuth";
+    private static final String EAGLE_CORRELATION_SMTP_USERNAME = "metadataService.mailSmtpUsername";
+    private static final String EAGLE_CORRELATION_SMTP_PASSWORD = "metadataService.mailSmtpPassword";
+    private static final String EAGLE_CORRELATION_SMTP_DEBUG = "metadataService.mailSmtpDebug";
+
     private AlertEmailGenerator emailGenerator;
     private Map<String, Object> emailConfig;
 
     private transient ThreadPoolExecutor executorPool;
     private String serverHost;
     private int serverPort;
+    private Properties mailClientProperties;
 
     @Override
     @SuppressWarnings("rawtypes")
@@ -58,6 +71,7 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
             ? config.getString(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_HOST) : "localhost";
         this.serverPort = config.hasPath(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_PORT)
             ? config.getInt(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_PORT) : 80;
+        this.mailClientProperties = parseMailClientConfig(config);
 
         executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
         LOG.info(" Creating Email Generator... ");
@@ -65,7 +79,37 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
             emailConfig = new HashMap<>(publishment.getProperties());
             emailGenerator = createEmailGenerator(emailConfig);
         }
+    }
+
+    private Properties parseMailClientConfig(Config config) {
+        Properties props = new Properties();
+        String mailSmtpServer = config.getString(EAGLE_CORRELATION_SMTP_SERVER);
+        String mailSmtpPort = config.getString(EAGLE_CORRELATION_SMTP_PORT);
+        String mailSmtpAuth =  config.getString(EAGLE_CORRELATION_SMTP_AUTH);
+
+        props.put(AlertEmailConstants.CONF_MAIL_HOST, mailSmtpServer);
+        props.put(AlertEmailConstants.CONF_MAIL_PORT, mailSmtpPort);
+        props.put(AlertEmailConstants.CONF_MAIL_AUTH, mailSmtpAuth);
+
+        if (Boolean.parseBoolean(mailSmtpAuth)) {
+            String mailSmtpUsername = config.getString(EAGLE_CORRELATION_SMTP_USERNAME);
+            String mailSmtpPassword = config.getString(EAGLE_CORRELATION_SMTP_PASSWORD);
+            props.put(AlertEmailConstants.CONF_AUTH_USER, mailSmtpUsername);
+            props.put(AlertEmailConstants.CONF_AUTH_PASSWORD, mailSmtpPassword);
+        }
 
+        String mailSmtpConn = config.hasPath(EAGLE_CORRELATION_SMTP_CONN) ? config.getString(EAGLE_CORRELATION_SMTP_CONN) : AlertEmailConstants.CONN_PLAINTEXT;
+        String mailSmtpDebug = config.hasPath(EAGLE_CORRELATION_SMTP_DEBUG) ? config.getString(EAGLE_CORRELATION_SMTP_DEBUG) : "false";
+        if (mailSmtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_TLS)) {
+            props.put("mail.smtp.starttls.enable", "true");
+        }
+        if (mailSmtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_SSL)) {
+            props.put("mail.smtp.socketFactory.port", "465");
+            props.put("mail.smtp.socketFactory.class",
+                    "javax.net.ssl.SSLSocketFactory");
+        }
+        props.put(AlertEmailConstants.CONF_MAIL_DEBUG, mailSmtpDebug);
+        return props;
     }
 
     @Override
@@ -126,8 +170,9 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
             LOG.warn("email sender or recipients is null");
             return null;
         }
+
         AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder()
-            .withMailProps(notificationConfig)
+            .withMailProps(this.mailClientProperties)
             .withSubject(subject)
             .withSender(sender)
             .withRecipients(recipients)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
index 754c00b..b151e99 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
@@ -38,7 +38,14 @@
   "metadataService": {
     "context": "/rest",
     "host": "localhost",
-    "port": 9090
+    "port": 9090,
+    mailSmtpServer = "",
+    mailSmtpPort = 25,
+    mailSmtpAuth = "false"
+    //mailSmtpConn = "plaintext",
+    //mailSmtpUsername = ""
+    //mailSmtpPassword = ""
+    //mailSmtpDebug = false
   },
   "metric": {
     "sink": {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
index 3f3141a..1f131a9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
@@ -39,7 +39,7 @@ public class AlertEmailPublisherTest {
 
     @Before
     public void setUp(){
-        config = ConfigFactory.load();
+        config = ConfigFactory.load("application-test.conf");
         server = SimpleSmtpServer.start(SMTP_PORT);
     }
 
@@ -57,8 +57,6 @@ public class AlertEmailPublisherTest {
         properties.put(PublishConstants.SUBJECT,EMAIL_PUBLISHER_TEST_POLICY);
         properties.put(PublishConstants.SENDER,"eagle@localhost");
         properties.put(PublishConstants.RECIPIENTS,"somebody@localhost");
-        properties.put(AlertEmailConstants.CONF_MAIL_HOST,"localhost");
-        properties.put(AlertEmailConstants.CONF_MAIL_PORT,String.valueOf(SMTP_PORT));
         Publishment publishment = new Publishment();
         publishment.setName("testEmailPublishment");
         publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
index dc016f4..3573507 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
@@ -48,21 +48,28 @@
   "metadataService": {
     "context": "/api",
     "host": "localhost",
-    "port": 8080
+    "port": 8080,
+    mailSmtpServer = "localhost",
+    mailSmtpPort = 5025,
+    mailSmtpAuth = "false"
+    //mailSmtpConn = "plaintext",
+    //mailSmtpUsername = ""
+    //mailSmtpPassword = ""
+    //mailSmtpDebug = false
   },
   "metric": {
     "sink": {
-      //      "kafka": {
-      //        "topic": "alert_metric_test"
-      //        "bootstrap.servers": "localhost:9092"
-      //      }
-      "logger": {
-        "level": "INFO"
-      }
-      "elasticsearch": {
-        "hosts": ["localhost:9200"]
-        "index": "alert_metric_test"
-      }
+//      "kafka": {
+//        "topic": "alert_metric_test"
+//        "bootstrap.servers": "localhost:9092"
+//      }
+//      "logger": {
+//          "level": "INFO"
+//      }
+//      "elasticsearch": {
+//          "hosts": ["localhost:9200"]
+//          "index": "alert_metric_test"
+//      }
     }
   },
   "connection": "mongodb://localhost:27017"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 751853c..c814252 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -16,8 +16,6 @@
  */
 package org.apache.eagle.service.metadata.resource;
 
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -32,12 +30,12 @@ import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
 import org.apache.eagle.alert.metadata.resource.Models;
 import org.apache.eagle.alert.metadata.resource.OpResult;
 
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
-import java.util.stream.Collectors;
 import javax.ws.rs.*;
 
 /**
@@ -253,86 +251,26 @@ public class MetadataResource {
     @GET
     public List<AlertPublishEvent> getAlertPublishEventByPolicyId(@PathParam("policyId") String policyId,
                                                                   @QueryParam("size") int size) {
-        return dao.getAlertPublishEventByPolicyId(policyId, size);
+        return dao.getAlertPublishEventsByPolicyId(policyId, size);
     }
 
     @Path("/policies/{policyId}/publishments")
     @GET
     public List<Publishment> getPolicyPublishments(@PathParam("policyId") String policyId) {
-        return dao.listPublishment().stream().filter(ps ->
-            ps.getPolicyIds() != null && ps.getPolicyIds().contains(policyId)
-        ).collect(Collectors.toList());
+        return dao.getPublishmentsByPolicyId(policyId);
     }
 
     @Path("/policies/{policyId}/publishments")
     @POST
     public OpResult addPublishmentsToPolicy(@PathParam("policyId") String policyId, List<String> publishmentIds) {
-        OpResult result = new OpResult();
-        if (publishmentIds == null || publishmentIds.size() == 0) {
-            result.code = OpResult.FAILURE;
-            result.message = "Failed to add policy, there is no publisher in it";
-            return result;
-        }
-        try {
-            getPolicyByID(policyId);
-            Map<String,Publishment> publishmentMap = new HashMap<>();
-            listPublishment().forEach((pub) -> publishmentMap.put(pub.getName(),pub));
-            for (String publishmentId : publishmentIds) {
-                if (publishmentMap.containsKey(publishmentId)) {
-                    Publishment publishment = publishmentMap.get(publishmentId);
-                    if (publishment.getPolicyIds() == null) {
-                        publishment.setPolicyIds(new ArrayList<>());
-                    }
-                    if (publishment.getPolicyIds().contains(policyId)) {
-                        LOG.warn("Policy {} was already bound with publisher {}",policyId, publishmentId);
-                    } else {
-                        publishment.getPolicyIds().add(policyId);
-                    }
-                    OpResult opResult = addPublishment(publishment);
-                    if (opResult.code == OpResult.FAILURE) {
-                        LOG.error("Failed to add publisher {} to policy {}: {}", publishmentId, policyId, opResult.message);
-                        return opResult;
-                    } else {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug(opResult.message);
-                        }
-                    }
-                } else {
-                    throw new IllegalArgumentException("Publishment (name: " + publishmentId + ") not found");
-                }
-            }
-
-            //for other publishments, remove policyId from them, work around, we should refactor
-            for (String publishmentId : publishmentMap.keySet()) {
-                if (publishmentIds.contains(publishmentId)) {
-                    continue;
-                }
-                Publishment publishment = publishmentMap.get(publishmentId);
-                if (publishment.getPolicyIds() != null && publishment.getPolicyIds().contains(policyId)) {
-                    publishment.getPolicyIds().remove(policyId);
-                    OpResult opResult = addPublishment(publishment);
-                    if (opResult.code == OpResult.FAILURE) {
-                        LOG.error("Failed to delete policy {}, from publisher {}, {} ", policyId, publishmentId, opResult.message);
-                        return opResult;
-                    }
-                }
-            }
-            result.code = OpResult.SUCCESS;
-            result.message = "Successfully add " + publishmentIds.size() + " publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId;
-            LOG.info(result.message);
-        } catch (Exception ex) {
-            result.code = OpResult.FAILURE;
-            result.message = "Failed to add publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId + ", cause: " + ex.getMessage();
-            LOG.error(result.message,ex);
-        }
-        return result;
+        return dao.addPublishmentsToPolicy(policyId, publishmentIds);
     }
 
     @Path("/policies/{policyId}")
     @GET
-    public List<PolicyDefinition> getPolicyByID(@PathParam("policyId") String policyId) {
+    public PolicyDefinition getPolicyById(@PathParam("policyId") String policyId) {
         Preconditions.checkNotNull(policyId, "policyId");
-        return dao.listPolicies().stream().filter(pc -> pc.getName().equals(policyId)).collect(Collectors.toList());
+        return dao.getPolicyById(policyId);
     }
 
     @Path("/policies/{policyId}/status/{status}")
@@ -340,7 +278,7 @@ public class MetadataResource {
     public OpResult updatePolicyStatusByID(@PathParam("policyId") String policyId, @PathParam("status") PolicyDefinition.PolicyStatus status) {
         OpResult result = new OpResult();
         try {
-            PolicyDefinition policyDefinition = getPolicyByID(policyId).get(0);
+            PolicyDefinition policyDefinition = getPolicyById(policyId);
             policyDefinition.setPolicyStatus(status);
             OpResult updateResult  = addPolicy(policyDefinition);
             result.code = updateResult.code;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
index c5221c2..2dc7f51 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
@@ -17,6 +17,7 @@
 package org.apache.eagle.alert.metadata;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -30,7 +31,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 public interface IMetadataDao extends Closeable {
 
@@ -68,7 +73,6 @@ public interface IMetadataDao extends Closeable {
 
     OpResult addPublishment(Publishment publishment);
 
-
     OpResult removePublishment(String pubId);
 
     List<PublishmentType> listPublishmentType();
@@ -81,7 +85,7 @@ public interface IMetadataDao extends Closeable {
 
     AlertPublishEvent getAlertPublishEvent(String alertId);
 
-    List<AlertPublishEvent> getAlertPublishEventByPolicyId(String policyId, int size);
+    List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size);
 
     OpResult addAlertPublishEvent(AlertPublishEvent event);
 
@@ -112,11 +116,78 @@ public interface IMetadataDao extends Closeable {
 
     Logger LOG = LoggerFactory.getLogger(IMetadataDao.class);
 
-    default PolicyDefinition getPolicyByID(String policyId) {
+    default PolicyDefinition getPolicyById(String policyId) {
         Preconditions.checkNotNull(policyId,"policyId");
         return listPolicies().stream().filter(pc -> pc.getName().equals(policyId)).findAny().orElseGet(() -> {
             LOG.error("Policy (policyId " + policyId + ") not found");
             throw new IllegalArgumentException("Policy (policyId " + policyId + ") not found");
         });
     }
+
+    default List<Publishment> getPublishmentsByPolicyId(String policyId) {
+        return listPublishment().stream().filter(ps ->
+                ps.getPolicyIds() != null && ps.getPolicyIds().contains(policyId)
+        ).collect(Collectors.toList());
+    }
+
+    default OpResult addPublishmentsToPolicy(String policyId, List<String> publishmentIds) {
+        OpResult result = new OpResult();
+        if (publishmentIds == null || publishmentIds.size() == 0) {
+            result.code = OpResult.FAILURE;
+            result.message = "Failed to add policy, there is no publisher in it";
+            return result;
+        }
+        try {
+            Map<String,Publishment> publishmentMap = new HashMap<>();
+            listPublishment().forEach((pub) -> publishmentMap.put(pub.getName(),pub));
+            for (String publishmentId : publishmentIds) {
+                if (publishmentMap.containsKey(publishmentId)) {
+                    Publishment publishment = publishmentMap.get(publishmentId);
+                    if (publishment.getPolicyIds() == null) {
+                        publishment.setPolicyIds(new ArrayList<>());
+                    }
+                    if (publishment.getPolicyIds().contains(policyId)) {
+                        LOG.warn("Policy {} was already bound with publisher {}",policyId, publishmentId);
+                    } else {
+                        publishment.getPolicyIds().add(policyId);
+                    }
+                    OpResult opResult = addPublishment(publishment);
+                    if (opResult.code == OpResult.FAILURE) {
+                        LOG.error("Failed to add publisher {} to policy {}: {}", publishmentId, policyId, opResult.message);
+                        return opResult;
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(opResult.message);
+                        }
+                    }
+                } else {
+                    throw new IllegalArgumentException("Publishment (name: " + publishmentId + ") not found");
+                }
+            }
+
+            //for other publishments, remove policyId from them, work around, we should refactor
+            for (String publishmentId : publishmentMap.keySet()) {
+                if (publishmentIds.contains(publishmentId)) {
+                    continue;
+                }
+                Publishment publishment = publishmentMap.get(publishmentId);
+                if (publishment.getPolicyIds() != null && publishment.getPolicyIds().contains(policyId)) {
+                    publishment.getPolicyIds().remove(policyId);
+                    OpResult opResult = addPublishment(publishment);
+                    if (opResult.code == OpResult.FAILURE) {
+                        LOG.error("Failed to delete policy {}, from publisher {}, {} ", policyId, publishmentId, opResult.message);
+                        return opResult;
+                    }
+                }
+            }
+            result.code = OpResult.SUCCESS;
+            result.message = "Successfully add " + publishmentIds.size() + " publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId;
+            LOG.info(result.message);
+        } catch (Exception ex) {
+            result.code = OpResult.FAILURE;
+            result.message = "Failed to add publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId + ", cause: " + ex.getMessage();
+            LOG.error(result.message,ex);
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
index adfe15a..2af49fb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
@@ -219,7 +219,7 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
     }
 
     @Override
-    public List<AlertPublishEvent> getAlertPublishEventByPolicyId(String policyId, int size) {
+    public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) {
         List<AlertPublishEvent> result = alerts.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList());
         if (size < 0 || size > result.size()) {
             size = result.size();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
deleted file mode 100644
index 933c02e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.metadata.impl;
-
-import org.apache.commons.dbcp.BasicDataSource;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.sql.DataSource;
-import java.io.IOException;
-import java.sql.*;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class JdbcDatabaseHandler {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JdbcDatabaseHandler.class);
-
-    private static final String INSERT_STATEMENT = "INSERT INTO %s VALUES (?, ?)";
-    private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE id=?";
-    private static final String UPDATE_STATEMENT = "UPDATE %s set value=? WHERE id=?";
-    private static final String QUERY_ALL_STATEMENT = "SELECT value FROM %s";
-    private static final String QUERY_CONDITION_STATEMENT = "SELECT value FROM %s WHERE id=?";
-    private static final String QUERY_ORDERBY_STATEMENT = "SELECT value FROM %s ORDER BY id %s";
-    private static final String QUERY_ALL_STATEMENT_WITH_SIZE = "SELECT value FROM %s limit %s";
-    private static final String CLEAR_SCHEDULESTATES_STATEMENT = "DELETE FROM schedule_state WHERE id NOT IN (SELECT id from (SELECT id FROM schedule_state ORDER BY id DESC limit ?) as states)";
-
-    public enum SortType { DESC, ASC }
-
-    private static Map<String, String> tblNameMap = new HashMap<>();
-
-    private static final ObjectMapper mapper = new ObjectMapper();
-    private DataSource dataSource;
-
-    static {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
-        registerTableName(StreamingCluster.class.getSimpleName(), "cluster");
-        registerTableName(StreamDefinition.class.getSimpleName(), "stream_schema");
-        registerTableName(Kafka2TupleMetadata.class.getSimpleName(), "datasource");
-        registerTableName(PolicyDefinition.class.getSimpleName(), "policy");
-        registerTableName(Publishment.class.getSimpleName(), "publishment");
-        registerTableName(PublishmentType.class.getSimpleName(), "publishment_type");
-        registerTableName(ScheduleState.class.getSimpleName(), "schedule_state");
-        registerTableName(PolicyAssignment.class.getSimpleName(), "assignment");
-        registerTableName(Topology.class.getSimpleName(), "topology");
-        registerTableName(AlertPublishEvent.class.getSimpleName(), "alert_event");
-    }
-
-    private static void registerTableName(String clzName, String tblName) {
-        tblNameMap.put(clzName, tblName);
-    }
-
-    public JdbcDatabaseHandler(Config config) {
-        // "jdbc:mysql://dbhost/database?" + "user=sqluser&password=sqluserpw"
-        //this.tblNameMap = JdbcSchemaManager.tblNameMap;
-        try {
-            //JdbcSchemaManager.getInstance().init(config);
-            BasicDataSource bDatasource = new BasicDataSource();
-            bDatasource.setDriverClassName(config.getString(MetadataUtils.JDBC_DRIVER_PATH));
-            if (config.hasPath(MetadataUtils.JDBC_USERNAME_PATH)) {
-                bDatasource.setUsername(config.getString(MetadataUtils.JDBC_USERNAME_PATH));
-                bDatasource.setPassword(config.getString(MetadataUtils.JDBC_PASSWORD_PATH));
-            }
-            bDatasource.setUrl(config.getString(MetadataUtils.JDBC_CONNECTION_PATH));
-            if (config.hasPath(MetadataUtils.JDBC_CONNECTION_PROPERTIES_PATH)) {
-                bDatasource.setConnectionProperties(config.getString(MetadataUtils.JDBC_CONNECTION_PROPERTIES_PATH));
-            }
-            this.dataSource = bDatasource;
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    private String getTableName(String clzName) {
-        String tbl = tblNameMap.get(clzName);
-        if (tbl != null) {
-            return tbl;
-        } else {
-            return clzName;
-        }
-    }
-
-    public <T> OpResult addOrReplace(String clzName, T t) {
-        String tb = getTableName(clzName);
-        OpResult result = new OpResult();
-        PreparedStatement statement = null;
-        Savepoint savepoint = null;
-        String key = null;
-        String value = null;
-        Connection connection = null;
-        try {
-            connection = dataSource.getConnection();
-            statement = connection.prepareStatement(String.format(INSERT_STATEMENT, tb));
-            key = MetadataUtils.getKey(t);
-            value = mapper.writeValueAsString(t);
-
-            statement.setString(1, key);
-            Clob clob = connection.createClob();
-            clob.setString(1, value);
-            statement.setClob(2, clob);
-
-            connection.setAutoCommit(false);
-            savepoint = connection.setSavepoint("insertEntity");
-            int status = statement.executeUpdate();
-            LOG.info("update {} entities", status);
-            connection.commit();
-        } catch (SQLException e) {
-            LOG.error(e.getMessage(), e.getCause());
-            if (connection != null) {
-                LOG.info("Detected duplicated entity");
-                try {
-                    connection.rollback(savepoint);
-                    update(tb, key, value);
-                } catch (SQLException e1) {
-                    //e1.printStackTrace();
-                    LOG.warn("Rollback failed", e1);
-                }
-            }
-        } catch (JsonProcessingException e) {
-            LOG.error("Got JsonProcessingException: {}", e.getMessage(), e.getCause());
-            result.code = OpResult.FAILURE;
-            result.message = e.getMessage();
-        } finally {
-            if (statement != null) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                    LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
-                }
-            }
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
-                }
-            }
-        }
-        return result;
-    }
-
-    private <T> OpResult update(String tb, String key, String value) throws SQLException {
-        OpResult result = new OpResult();
-        PreparedStatement statement = null;
-        Connection connection = null;
-        try {
-            connection = dataSource.getConnection();
-            statement = connection.prepareStatement(String.format(UPDATE_STATEMENT, tb));
-            Clob clob = connection.createClob();
-            clob.setString(1, value);
-            statement.setClob(1, clob);
-            statement.setString(2, key);
-
-            int status = statement.executeUpdate();
-            LOG.info("update {} entities from table {}", status, tb);
-        } catch (SQLException e) {
-            LOG.error(e.getMessage(), e);
-            result.code = OpResult.FAILURE;
-            result.message = e.getMessage();
-        } finally {
-            if (statement != null) {
-                statement.close();
-            }
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
-                }
-            }
-        }
-        return result;
-    }
-
-    public <T> List<T> list(Class<T> clz) {
-        String tb = getTableName(clz.getSimpleName());
-        String query = String.format(QUERY_ALL_STATEMENT, tb);
-        return executeSelectStatement(clz, query);
-    }
-
-    public <T> List<T> listSubset(Class<T> clz, int size) {
-        String tb = getTableName(clz.getSimpleName());
-        String query = String.format(QUERY_ALL_STATEMENT_WITH_SIZE, tb, size);
-        return executeSelectStatement(clz, query);
-    }
-
-    public <T> List<T> listOrderBy(Class<T> clz, String sortType) {
-        String tb = getTableName(clz.getSimpleName());
-        String query = String.format(QUERY_ORDERBY_STATEMENT, tb, sortType);
-        return executeSelectStatement(clz, query);
-    }
-
-    public <T> T listWithFilter(String key, Class<T> clz) {
-        return executeSelectByIdStatement(clz, key);
-    }
-
-    public <T> T executeSelectByIdStatement(Class<T> clz, String id) {
-        String tb = getTableName(clz.getSimpleName());
-        List<T> result = new LinkedList<>();
-        Connection connection = null;
-        try {
-            connection = dataSource.getConnection();
-            PreparedStatement statement = connection.prepareStatement(String.format(QUERY_CONDITION_STATEMENT, tb));
-            statement.setString(1, id);
-            ResultSet rs = statement.executeQuery();
-            while (rs.next()) {
-                //String key = rs.getString(1);
-                String json = rs.getString(1);
-                try {
-                    T obj = mapper.readValue(json, clz);
-                    result.add(obj);
-                } catch (IOException e) {
-                    LOG.error("deserialize config item failed!", e);
-                }
-            }
-            rs.close();
-            statement.close();
-        } catch (SQLException e) {
-            LOG.error(e.getMessage(), e);
-        } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
-                }
-            }
-        }
-        if (result.isEmpty()) {
-            return null;
-        } else {
-            return result.get(0);
-        }
-    }
-
-    public <T> List<T> executeSelectStatement(Class<T> clz, String query) {
-        String tb = getTableName(clz.getSimpleName());
-        List<T> result = new LinkedList<>();
-        Connection connection = null;
-        try {
-            connection = dataSource.getConnection();
-            Statement statement = connection.createStatement();
-            ResultSet rs = statement.executeQuery(query);
-            while (rs.next()) {
-                //String key = rs.getString(1);
-                String json = rs.getString(1);
-                try {
-                    T obj = mapper.readValue(json, clz);
-                    result.add(obj);
-                } catch (IOException e) {
-                    LOG.error("deserialize config item failed!", e);
-                }
-            }
-            rs.close();
-            statement.close();
-        } catch (SQLException e) {
-            LOG.error(e.getMessage(), e);
-        } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
-                }
-            }
-        }
-        return result;
-    }
-
-    public <T> OpResult remove(String clzName, String key) {
-        String tb = getTableName(clzName);
-        OpResult result = new OpResult();
-        Connection connection = null;
-        try {
-            connection = dataSource.getConnection();
-            PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb));
-            statement.setString(1, key);
-            int status = statement.executeUpdate();
-            String msg = String.format("delete %s entities from table %s", status, tb);
-            result.code = OpResult.SUCCESS;
-            result.message = msg;
-            statement.close();
-        } catch (SQLException e) {
-            result.code = OpResult.FAILURE;
-            result.message = e.getMessage();
-            LOG.error(e.getMessage(), e);
-        } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
-                }
-            }
-        }
-        return result;
-    }
-
-    public void close() throws IOException {
-        //JdbcSchemaManager.getInstance().shutdown();
-    }
-
-    public OpResult removeBatch(String clzName, List<String> keys) {
-        String tb = getTableName(clzName);
-        OpResult result = new OpResult();
-        Connection connection = null;
-        try {
-            connection = dataSource.getConnection();
-            connection.setAutoCommit(false);
-            PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb));
-            for (String key : keys) {
-                statement.setString(1, key);
-                statement.addBatch();
-            }
-            int[] num = statement.executeBatch();
-            connection.commit();
-            int sum = 0;
-            for (int i : num) {
-                sum += i;
-            }
-            String msg = String.format("delete %s records from table %s", sum, tb);
-            result.code = OpResult.SUCCESS;
-            result.message = msg;
-            statement.close();
-        } catch (SQLException e) {
-            result.code = OpResult.FAILURE;
-            result.message = e.getMessage();
-            LOG.error(e.getMessage(), e);
-        } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
-                }
-            }
-        }
-        return result;
-    }
-
-    public OpResult removeScheduleStates(int capacity) {
-        OpResult result = new OpResult();
-        Connection connection = null;
-        try {
-            connection = dataSource.getConnection();
-            PreparedStatement statement = connection.prepareStatement(CLEAR_SCHEDULESTATES_STATEMENT);
-            statement.setInt(1, capacity);
-            result.message = String.format("delete %d records from schedule_state", statement.executeUpdate());
-            result.code = OpResult.SUCCESS;
-            statement.close();
-        } catch (SQLException e) {
-            result.code = OpResult.FAILURE;
-            result.message = e.getMessage();
-            LOG.error(e.getMessage(), e);
-        } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
-                }
-            }
-        }
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
index 384eddc..b522451 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
@@ -35,18 +35,17 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * @since May 26, 2016.
  */
 public class JdbcMetadataDaoImpl implements IMetadataDao {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataDaoImpl.class);
-    private JdbcDatabaseHandler handler;
+    private JdbcMetadataHandler handler;
 
     @Inject
     public JdbcMetadataDaoImpl(Config config) {
-        handler = new JdbcDatabaseHandler(config.getConfig(MetadataUtils.META_DATA));
+        handler = new JdbcMetadataHandler(config.getConfig(MetadataUtils.META_DATA));
     }
 
     @Override
@@ -76,42 +75,49 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
 
     @Override
     public List<Publishment> listPublishment() {
-        return handler.list(Publishment.class);
+        return handler.listPublishments();
     }
 
     @Override
     public List<AlertPublishEvent> listAlertPublishEvent(int size) {
-        List<AlertPublishEvent> result = handler.list(AlertPublishEvent.class);
-        if (size < 0 || size > result.size()) {
-            size = result.size();
+        if (size <= 0) {
+            LOG.info("Invalid parameter size <= 0");
+            return new ArrayList<>();
         }
-        return result.subList(result.size() - size, result.size());
+        return handler.listAlertEvents(null, null, size);
+    }
+
+    public PolicyDefinition getPolicyById(String policyId) {
+        return handler.queryById(PolicyDefinition.class, policyId);
+    }
+
+    public List<Publishment> getPublishmentsByPolicyId(String policyId) {
+        return handler.getPublishmentsByPolicyId(policyId);
     }
 
     @Override
     public AlertPublishEvent getAlertPublishEvent(String alertId) {
-        return handler.listWithFilter(alertId, AlertPublishEvent.class);
+        return handler.getAlertEventById(alertId, 1);
     }
 
     @Override
-    public List<AlertPublishEvent> getAlertPublishEventByPolicyId(String policyId, int size) {
-        List<AlertPublishEvent> alerts = handler.list(AlertPublishEvent.class);
-        List<AlertPublishEvent> result = alerts.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList());
-        if (size < 0 || size > result.size()) {
-            size = result.size();
+    public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) {
+        if (size <= 0) {
+            LOG.info("Invalid parameter size <= 0");
+            return new ArrayList<>();
         }
-        return result.subList(result.size() - size, result.size());
+        return handler.getAlertEventByPolicyId(policyId, size);
     }
 
     @Override
     public ScheduleState getScheduleState(String versionId) {
-        return handler.listWithFilter(versionId, ScheduleState.class);
+        return handler.queryById(ScheduleState.class, versionId);
     }
 
     @Override
     public ScheduleState getScheduleState() {
         List<ScheduleState> scheduleStates =
-                handler.listOrderBy(ScheduleState.class, JdbcDatabaseHandler.SortType.DESC.toString());
+                handler.list(ScheduleState.class, JdbcMetadataHandler.SortType.DESC);
         if (scheduleStates.isEmpty()) {
             return null;
         } else {
@@ -146,7 +152,7 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
 
     @Override
     public OpResult addAlertPublishEvent(AlertPublishEvent event) {
-        return handler.addOrReplace(AlertPublishEvent.class.getSimpleName(), event);
+        return handler.addAlertEvent(event);
     }
 
     @Override
@@ -170,6 +176,11 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
     }
 
     @Override
+    public OpResult addPublishmentsToPolicy(String policyId, List<String> publishmentIds) {
+        return handler.addPublishmentsToPolicy(policyId, publishmentIds);
+    }
+
+    @Override
     public OpResult addScheduleState(ScheduleState state) {
         return handler.addOrReplace(ScheduleState.class.getSimpleName(), state);
     }
@@ -196,37 +207,37 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
 
     @Override
     public OpResult removeTopology(String topologyName) {
-        return handler.remove(Topology.class.getSimpleName(), topologyName);
+        return handler.removeById(Topology.class.getSimpleName(), topologyName);
     }
 
     @Override
     public OpResult removeCluster(String clusterId) {
-        return handler.remove(StreamingCluster.class.getSimpleName(), clusterId);
+        return handler.removeById(StreamingCluster.class.getSimpleName(), clusterId);
     }
 
     @Override
     public OpResult removeStream(String streamId) {
-        return handler.remove(StreamDefinition.class.getSimpleName(), streamId);
+        return handler.removeById(StreamDefinition.class.getSimpleName(), streamId);
     }
 
     @Override
     public OpResult removeDataSource(String datasourceId) {
-        return handler.remove(Kafka2TupleMetadata.class.getSimpleName(), datasourceId);
+        return handler.removeById(Kafka2TupleMetadata.class.getSimpleName(), datasourceId);
     }
 
     @Override
     public OpResult removePolicy(String policyId) {
-        return handler.remove(PolicyDefinition.class.getSimpleName(), policyId);
+        return handler.removeById(PolicyDefinition.class.getSimpleName(), policyId);
     }
 
     @Override
     public OpResult removePublishment(String pubId) {
-        return handler.remove(Publishment.class.getSimpleName(), pubId);
+        return handler.removeById(Publishment.class.getSimpleName(), pubId);
     }
 
     @Override
     public OpResult removePublishmentType(String pubType) {
-        return handler.remove(PublishmentType.class.getSimpleName(), pubType);
+        return handler.removeById(PublishmentType.class.getSimpleName(), pubType);
     }
 
     @Override