You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/12/02 10:36:22 UTC
[2/2] incubator-eagle git commit: [EAGLE-811] Refactor
jdbcMetadataDaoImpl of alert engine metadata
[EAGLE-811] Refactor jdbcMetadataDaoImpl of alert engine metadata
* Tickets
https://issues.apache.org/jira/browse/EAGLE-811
https://issues.apache.org/jira/browse/EAGLE-808
* fix a bug 'alertId is null' in email publisher
* improve UnitAlertApplication config
Author: Zhao, Qingwen <qi...@apache.org>
Closes #705 from qingwen220/EAGLE-811.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/30e35de6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/30e35de6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/30e35de6
Branch: refs/heads/master
Commit: 30e35de60c532502731ef5b51360df15c0026397
Parents: aef7ea3
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Fri Dec 2 18:36:14 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Fri Dec 2 18:36:14 2016 +0800
----------------------------------------------------------------------
...e.alert.app.AlertUnitTopologyAppProvider.xml | 47 +-
.../src/test/resource/application.conf | 9 +-
.../alert/engine/model/AlertStreamEvent.java | 1 +
.../eagle/alert/utils/AlertConstants.java | 2 +
.../src/test/resources/application.conf | 9 +-
.../eagle/alert/engine/StreamContext.java | 16 +
.../publisher/email/AlertEmailGenerator.java | 7 +-
.../email/AlertEmailGeneratorBuilder.java | 4 +-
.../publisher/email/AlertEmailSender.java | 44 +-
.../publisher/impl/AlertEmailPublisher.java | 47 +-
.../src/main/resources/application.conf | 9 +-
.../publisher/AlertEmailPublisherTest.java | 4 +-
.../src/test/resources/application-test.conf | 31 +-
.../metadata/resource/MetadataResource.java | 76 +--
.../eagle/alert/metadata/IMetadataDao.java | 77 ++-
.../metadata/impl/InMemMetadataDaoImpl.java | 2 +-
.../metadata/impl/JdbcDatabaseHandler.java | 399 ---------------
.../metadata/impl/JdbcMetadataDaoImpl.java | 61 ++-
.../metadata/impl/JdbcMetadataHandler.java | 506 +++++++++++++++++++
.../metadata/impl/MongoMetadataDaoImpl.java | 2 +-
.../eagle/alert/metadata/impl/InMemoryTest.java | 2 +-
.../eagle/alert/metadata/impl/JdbcImplTest.java | 103 +++-
.../alert-metadata/src/test/resources/init.sql | 79 +--
.../src/main/resources/application.conf | 9 +-
.../eagle/app/service/ApplicationAction.java | 13 +-
.../app/service/ApplicationActionTest.java | 9 +-
.../src/test/resources/application.conf | 9 +-
.../eagle/common/TestSerializableUtils.java | 6 +-
.../eagle/metadata/model/AlertEntity.java | 1 -
.../src/test/resources/application-test.xml | 1 +
.../src/main/bin/createTables.sql | 78 ++-
eagle-server-assembly/src/main/conf/eagle.conf | 7 +
.../src/main/resources/application.conf | 9 +-
.../app/dev/public/js/services/policySrv.js | 4 +-
34 files changed, 1022 insertions(+), 661 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
index 8ee8b6b..74e97d3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
@@ -76,76 +76,39 @@
<!-- alert spout configuration -->
<property>
<name>spout.kafkaBrokerZkQuorum</name>
- <displayName>Kafka Zookeeper Quorum</displayName>
+ <displayName>Kafka Spout Broker Zookeeper Quorum</displayName>
<value>localhost:2181</value>
<description>Zookeeper quorum of kafka broker for spout to consume data</description>
<required>true</required>
</property>
<property>
<name>spout.kafkaBrokerZkBasePath</name>
- <displayName>Kafka Zookeeper Root</displayName>
+ <displayName>Kafka Spout Broker Zookeeper Root</displayName>
<value>/brokers</value>
<description>Zookeeper znode path for kafka brokers</description>
<required>false</required>
</property>
<property>
<name>spout.stormKafkaUseSameZkQuorumWithKafkaBroker</name>
- <displayName>Reuse Kafka Zookeeper</displayName>
+ <displayName>Spout Transaction Zookeeper to Reuse Broker Zookeeper</displayName>
<value>true</value>
<description>Use same zookeeper for kafka server and kafka consumer(Storm-Kafka)</description>
<required>false</required>
</property>
<property>
<name>spout.stormKafkaTransactionZkPath</name>
- <displayName>Kafka Transaction ZkPath</displayName>
+ <displayName>Spout Transaction Zookeeper Path</displayName>
<value>/consumers</value>
<description>Zookeeper path for storm kafka transaction</description>
<required>false</required>
</property>
<property>
<name>spout.stormKafkaEagleConsumer</name>
- <displayName>Kafka Consumer ID</displayName>
+ <displayName>Spout Consumer ID</displayName>
<value>eagle_consumer</value>
<description>Zookeeper quorum for spout to consume data</description>
<required>true</required>
</property>
-
- <!-- zk config for alert engine -->
- <property>
- <name>zkConfig.zkQuorum</name>
- <displayName>Coordinator Zookeeper Quorum</displayName>
- <value>localhost:2181</value>
- <description>Zookeeper quorum for alert engine</description>
- <required>true</required>
- </property>
- <property>
- <name>zkConfig.zkRoot</name>
- <displayName>Coordinator Zookeeper Root</displayName>
- <value>/alert</value>
- <description>Zookeeper znode path for alert engine</description>
- <required>false</required>
- </property>
- <property>
- <name>metadataService.context</name>
- <displayName>Metadata Service Context Path</displayName>
- <value>/rest</value>
- <description>Metadata service context path</description>
- <required>false</required>
- </property>
- <property>
- <name>metadataService.host</name>
- <displayName>Metadata Service Host</displayName>
- <value>localhost</value>
- <description>Metadata service host</description>
- <required>true</required>
- </property>
- <property>
- <name>metadataService.port</name>
- <displayName>Metadata Service Port</displayName>
- <value>9090</value>
- <description>Metadata service port</description>
- <required>true</required>
- </property>
</configuration>
<docs>
<install>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/resource/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/resource/application.conf b/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/resource/application.conf
index cfb2d47..6c15b1b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/resource/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/resource/application.conf
@@ -46,7 +46,14 @@
"metadataService": {
"host": "localhost",
"port": 8080,
- "context": "/rest"
+ "context": "/rest",
+ mailSmtpServer = "",
+ mailSmtpPort = 25,
+ mailSmtpAuth = "false"
+ //mailSmtpConn = "plaintext",
+ //mailSmtpUsername = ""
+ //mailSmtpPassword = ""
+ //mailSmtpDebug = false
}
"metadataDynamicCheck": {
"initDelayMillis": 1000,
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index c0a709d..b7f0132 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -123,6 +123,7 @@ public class AlertStreamEvent extends StreamEvent {
}
public String getAlertId() {
+ ensureAlertId();
return alertId;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
index 566944f..ee2c28c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
@@ -26,4 +26,6 @@ public class AlertConstants {
public static final String DEFAULT_ROUTERBOLT_NAME = "streamRouterBolt";
public static final String ALERT_SERVICE_ENDPOINT_NAME = "AlertService";
+
+ public static final String COORDINATOR = "coordinator";
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
index 72b8012..5d4da38 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
@@ -32,7 +32,14 @@
"metadataService": {
"host": "localhost",
"port": 8080,
- "context": "/rest"
+ "context": "/rest",
+ mailSmtpServer = "localhost",
+ mailSmtpPort = 25,
+ mailSmtpAuth = "false"
+ //mailSmtpConn = "plaintext",
+ //mailSmtpUsername = ""
+ //mailSmtpPassword = ""
+ //mailSmtpDebug = false
}
"metadataDynamicCheck": {
"initDelayMillis": 1000,
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
index bafba83..c2d5f2e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.eagle.alert.engine;
import com.typesafe.config.Config;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
index 8a69c37..809bb09 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.*;
public class AlertEmailGenerator {
@@ -43,7 +44,7 @@ public class AlertEmailGenerator {
private String serverHost = "localhost";
private int serverPort = 80;
- private Map<String, Object> properties;
+ private Properties properties;
private ThreadPoolExecutor executorPool;
@@ -173,11 +174,11 @@ public class AlertEmailGenerator {
this.subject = subject;
}
- public Map<String, Object> getProperties() {
+ public Properties getProperties() {
return properties;
}
- public void setProperties(Map<String, Object> properties) {
+ public void setProperties(Properties properties) {
this.properties = properties;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
index b018d5c..f6debab 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
@@ -17,7 +17,7 @@
*/
package org.apache.eagle.alert.engine.publisher.email;
-import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;
public class AlertEmailGeneratorBuilder {
@@ -51,7 +51,7 @@ public class AlertEmailGeneratorBuilder {
return this;
}
- public AlertEmailGeneratorBuilder withMailProps(Map<String, Object> mailProps) {
+ public AlertEmailGeneratorBuilder withMailProps(Properties mailProps) {
generator.setProperties(mailProps);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
index 1152d24..f573215 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
@@ -43,10 +43,7 @@ public class AlertEmailSender implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class);
private static final int MAX_RETRY_COUNT = 3;
-
- private Map<String, Object> mailProps;
-
-
+ private Properties mailProps;
private String threadName;
/**
@@ -73,45 +70,11 @@ public class AlertEmailSender implements Runnable {
LOG.info("Initialized " + threadName + ": origin is : " + this.origin + ", recipient of the email: " + this.recipients + ", velocity TPL file: " + this.configFileName);
}
- public AlertEmailSender(AlertEmailContext alertEmail, Map<String, Object> mailProps) {
+ public AlertEmailSender(AlertEmailContext alertEmail, Properties mailProps) {
this(alertEmail);
this.mailProps = mailProps;
}
- private Properties parseMailClientConfig(Map<String, Object> mailProps) {
- if (mailProps == null) {
- return null;
- }
- Properties props = new Properties();
- String mailHost = (String) mailProps.get(AlertEmailConstants.CONF_MAIL_HOST);
- String mailPort = (String) mailProps.get(AlertEmailConstants.CONF_MAIL_PORT);
- if (mailHost == null || mailPort == null || mailHost.isEmpty()) {
- LOG.warn("SMTP server is unset, will exit");
- return null;
- }
- props.put(AlertEmailConstants.CONF_MAIL_HOST, mailHost);
- props.put(AlertEmailConstants.CONF_MAIL_PORT, mailPort);
-
- String smtpAuth = (String) mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_AUTH, "false");
- props.put(AlertEmailConstants.CONF_MAIL_AUTH, smtpAuth);
- if (Boolean.parseBoolean(smtpAuth)) {
- props.put(AlertEmailConstants.CONF_AUTH_USER, mailProps.get(AlertEmailConstants.CONF_AUTH_USER));
- props.put(AlertEmailConstants.CONF_AUTH_PASSWORD, mailProps.get(AlertEmailConstants.CONF_AUTH_PASSWORD));
- }
-
- String smtpConn = (String) mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_CONN, AlertEmailConstants.CONN_PLAINTEXT);
- if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_TLS)) {
- props.put("mail.smtp.starttls.enable", "true");
- }
- if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_SSL)) {
- props.put("mail.smtp.socketFactory.port", "465");
- props.put("mail.smtp.socketFactory.class",
- "javax.net.ssl.SSLSocketFactory");
- }
- props.put(AlertEmailConstants.CONF_MAIL_DEBUG, mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_DEBUG, "false"));
- return props;
- }
-
@Override
public void run() {
int count = 0;
@@ -121,8 +84,7 @@ public class AlertEmailSender implements Runnable {
try {
final EagleMailClient client;
if (mailProps != null) {
- Properties props = parseMailClientConfig(mailProps);
- client = new EagleMailClient(props);
+ client = new EagleMailClient(mailProps);
} else {
client = new EagleMailClient();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index 40237ee..7431d35 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -21,6 +21,7 @@ package org.apache.eagle.alert.engine.publisher.impl;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.PublishConstants;
+import org.apache.eagle.alert.engine.publisher.email.AlertEmailConstants;
import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator;
import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder;
import com.typesafe.config.Config;
@@ -32,10 +33,13 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import static org.apache.eagle.alert.service.MetadataServiceClientImpl.*;
+
public class AlertEmailPublisher extends AbstractPublishPlugin {
private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class);
@@ -43,12 +47,21 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
private static final int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
private static final long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
+ private static final String EAGLE_CORRELATION_SMTP_SERVER = "metadataService.mailSmtpServer";
+ private static final String EAGLE_CORRELATION_SMTP_PORT = "metadataService.mailSmtpPort";
+ private static final String EAGLE_CORRELATION_SMTP_CONN = "metadataService.mailSmtpConn";
+ private static final String EAGLE_CORRELATION_SMTP_AUTH = "metadataService.mailSmtpAuth";
+ private static final String EAGLE_CORRELATION_SMTP_USERNAME = "metadataService.mailSmtpUsername";
+ private static final String EAGLE_CORRELATION_SMTP_PASSWORD = "metadataService.mailSmtpPassword";
+ private static final String EAGLE_CORRELATION_SMTP_DEBUG = "metadataService.mailSmtpDebug";
+
private AlertEmailGenerator emailGenerator;
private Map<String, Object> emailConfig;
private transient ThreadPoolExecutor executorPool;
private String serverHost;
private int serverPort;
+ private Properties mailClientProperties;
@Override
@SuppressWarnings("rawtypes")
@@ -58,6 +71,7 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
? config.getString(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_HOST) : "localhost";
this.serverPort = config.hasPath(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_PORT)
? config.getInt(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_PORT) : 80;
+ this.mailClientProperties = parseMailClientConfig(config);
executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
LOG.info(" Creating Email Generator... ");
@@ -65,7 +79,37 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
emailConfig = new HashMap<>(publishment.getProperties());
emailGenerator = createEmailGenerator(emailConfig);
}
+ }
+
+ private Properties parseMailClientConfig(Config config) {
+ Properties props = new Properties();
+ String mailSmtpServer = config.getString(EAGLE_CORRELATION_SMTP_SERVER);
+ String mailSmtpPort = config.getString(EAGLE_CORRELATION_SMTP_PORT);
+ String mailSmtpAuth = config.getString(EAGLE_CORRELATION_SMTP_AUTH);
+
+ props.put(AlertEmailConstants.CONF_MAIL_HOST, mailSmtpServer);
+ props.put(AlertEmailConstants.CONF_MAIL_PORT, mailSmtpPort);
+ props.put(AlertEmailConstants.CONF_MAIL_AUTH, mailSmtpAuth);
+
+ if (Boolean.parseBoolean(mailSmtpAuth)) {
+ String mailSmtpUsername = config.getString(EAGLE_CORRELATION_SMTP_USERNAME);
+ String mailSmtpPassword = config.getString(EAGLE_CORRELATION_SMTP_PASSWORD);
+ props.put(AlertEmailConstants.CONF_AUTH_USER, mailSmtpUsername);
+ props.put(AlertEmailConstants.CONF_AUTH_PASSWORD, mailSmtpPassword);
+ }
+ String mailSmtpConn = config.hasPath(EAGLE_CORRELATION_SMTP_CONN) ? config.getString(EAGLE_CORRELATION_SMTP_CONN) : AlertEmailConstants.CONN_PLAINTEXT;
+ String mailSmtpDebug = config.hasPath(EAGLE_CORRELATION_SMTP_DEBUG) ? config.getString(EAGLE_CORRELATION_SMTP_DEBUG) : "false";
+ if (mailSmtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_TLS)) {
+ props.put("mail.smtp.starttls.enable", "true");
+ }
+ if (mailSmtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_SSL)) {
+ props.put("mail.smtp.socketFactory.port", "465");
+ props.put("mail.smtp.socketFactory.class",
+ "javax.net.ssl.SSLSocketFactory");
+ }
+ props.put(AlertEmailConstants.CONF_MAIL_DEBUG, mailSmtpDebug);
+ return props;
}
@Override
@@ -126,8 +170,9 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
LOG.warn("email sender or recipients is null");
return null;
}
+
AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder()
- .withMailProps(notificationConfig)
+ .withMailProps(this.mailClientProperties)
.withSubject(subject)
.withSender(sender)
.withRecipients(recipients)
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
index 754c00b..b151e99 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
@@ -38,7 +38,14 @@
"metadataService": {
"context": "/rest",
"host": "localhost",
- "port": 9090
+ "port": 9090,
+ mailSmtpServer = "",
+ mailSmtpPort = 25,
+ mailSmtpAuth = "false"
+ //mailSmtpConn = "plaintext",
+ //mailSmtpUsername = ""
+ //mailSmtpPassword = ""
+ //mailSmtpDebug = false
},
"metric": {
"sink": {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
index 3f3141a..1f131a9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
@@ -39,7 +39,7 @@ public class AlertEmailPublisherTest {
@Before
public void setUp(){
- config = ConfigFactory.load();
+ config = ConfigFactory.load("application-test.conf");
server = SimpleSmtpServer.start(SMTP_PORT);
}
@@ -57,8 +57,6 @@ public class AlertEmailPublisherTest {
properties.put(PublishConstants.SUBJECT,EMAIL_PUBLISHER_TEST_POLICY);
properties.put(PublishConstants.SENDER,"eagle@localhost");
properties.put(PublishConstants.RECIPIENTS,"somebody@localhost");
- properties.put(AlertEmailConstants.CONF_MAIL_HOST,"localhost");
- properties.put(AlertEmailConstants.CONF_MAIL_PORT,String.valueOf(SMTP_PORT));
Publishment publishment = new Publishment();
publishment.setName("testEmailPublishment");
publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
index dc016f4..3573507 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
@@ -48,21 +48,28 @@
"metadataService": {
"context": "/api",
"host": "localhost",
- "port": 8080
+ "port": 8080,
+ mailSmtpServer = "localhost",
+ mailSmtpPort = 5025,
+ mailSmtpAuth = "false"
+ //mailSmtpConn = "plaintext",
+ //mailSmtpUsername = ""
+ //mailSmtpPassword = ""
+ //mailSmtpDebug = false
},
"metric": {
"sink": {
- // "kafka": {
- // "topic": "alert_metric_test"
- // "bootstrap.servers": "localhost:9092"
- // }
- "logger": {
- "level": "INFO"
- }
- "elasticsearch": {
- "hosts": ["localhost:9200"]
- "index": "alert_metric_test"
- }
+// "kafka": {
+// "topic": "alert_metric_test"
+// "bootstrap.servers": "localhost:9092"
+// }
+// "logger": {
+// "level": "INFO"
+// }
+// "elasticsearch": {
+// "hosts": ["localhost:9200"]
+// "index": "alert_metric_test"
+// }
}
},
"connection": "mongodb://localhost:27017"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 751853c..c814252 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -16,8 +16,6 @@
*/
package org.apache.eagle.service.metadata.resource;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.ScheduleState;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -32,12 +30,12 @@ import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
import org.apache.eagle.alert.metadata.resource.Models;
import org.apache.eagle.alert.metadata.resource.OpResult;
+import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
-import java.util.stream.Collectors;
import javax.ws.rs.*;
/**
@@ -253,86 +251,26 @@ public class MetadataResource {
@GET
public List<AlertPublishEvent> getAlertPublishEventByPolicyId(@PathParam("policyId") String policyId,
@QueryParam("size") int size) {
- return dao.getAlertPublishEventByPolicyId(policyId, size);
+ return dao.getAlertPublishEventsByPolicyId(policyId, size);
}
@Path("/policies/{policyId}/publishments")
@GET
public List<Publishment> getPolicyPublishments(@PathParam("policyId") String policyId) {
- return dao.listPublishment().stream().filter(ps ->
- ps.getPolicyIds() != null && ps.getPolicyIds().contains(policyId)
- ).collect(Collectors.toList());
+ return dao.getPublishmentsByPolicyId(policyId);
}
@Path("/policies/{policyId}/publishments")
@POST
public OpResult addPublishmentsToPolicy(@PathParam("policyId") String policyId, List<String> publishmentIds) {
- OpResult result = new OpResult();
- if (publishmentIds == null || publishmentIds.size() == 0) {
- result.code = OpResult.FAILURE;
- result.message = "Failed to add policy, there is no publisher in it";
- return result;
- }
- try {
- getPolicyByID(policyId);
- Map<String,Publishment> publishmentMap = new HashMap<>();
- listPublishment().forEach((pub) -> publishmentMap.put(pub.getName(),pub));
- for (String publishmentId : publishmentIds) {
- if (publishmentMap.containsKey(publishmentId)) {
- Publishment publishment = publishmentMap.get(publishmentId);
- if (publishment.getPolicyIds() == null) {
- publishment.setPolicyIds(new ArrayList<>());
- }
- if (publishment.getPolicyIds().contains(policyId)) {
- LOG.warn("Policy {} was already bound with publisher {}",policyId, publishmentId);
- } else {
- publishment.getPolicyIds().add(policyId);
- }
- OpResult opResult = addPublishment(publishment);
- if (opResult.code == OpResult.FAILURE) {
- LOG.error("Failed to add publisher {} to policy {}: {}", publishmentId, policyId, opResult.message);
- return opResult;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(opResult.message);
- }
- }
- } else {
- throw new IllegalArgumentException("Publishment (name: " + publishmentId + ") not found");
- }
- }
-
- //for other publishments, remove policyId from them, work around, we should refactor
- for (String publishmentId : publishmentMap.keySet()) {
- if (publishmentIds.contains(publishmentId)) {
- continue;
- }
- Publishment publishment = publishmentMap.get(publishmentId);
- if (publishment.getPolicyIds() != null && publishment.getPolicyIds().contains(policyId)) {
- publishment.getPolicyIds().remove(policyId);
- OpResult opResult = addPublishment(publishment);
- if (opResult.code == OpResult.FAILURE) {
- LOG.error("Failed to delete policy {}, from publisher {}, {} ", policyId, publishmentId, opResult.message);
- return opResult;
- }
- }
- }
- result.code = OpResult.SUCCESS;
- result.message = "Successfully add " + publishmentIds.size() + " publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId;
- LOG.info(result.message);
- } catch (Exception ex) {
- result.code = OpResult.FAILURE;
- result.message = "Failed to add publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId + ", cause: " + ex.getMessage();
- LOG.error(result.message,ex);
- }
- return result;
+ return dao.addPublishmentsToPolicy(policyId, publishmentIds);
}
@Path("/policies/{policyId}")
@GET
- public List<PolicyDefinition> getPolicyByID(@PathParam("policyId") String policyId) {
+ public PolicyDefinition getPolicyById(@PathParam("policyId") String policyId) {
Preconditions.checkNotNull(policyId, "policyId");
- return dao.listPolicies().stream().filter(pc -> pc.getName().equals(policyId)).collect(Collectors.toList());
+ return dao.getPolicyById(policyId);
}
@Path("/policies/{policyId}/status/{status}")
@@ -340,7 +278,7 @@ public class MetadataResource {
public OpResult updatePolicyStatusByID(@PathParam("policyId") String policyId, @PathParam("status") PolicyDefinition.PolicyStatus status) {
OpResult result = new OpResult();
try {
- PolicyDefinition policyDefinition = getPolicyByID(policyId).get(0);
+ PolicyDefinition policyDefinition = getPolicyById(policyId);
policyDefinition.setPolicyStatus(status);
OpResult updateResult = addPolicy(policyDefinition);
result.code = updateResult.code;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
index c5221c2..2dc7f51 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
@@ -17,6 +17,7 @@
package org.apache.eagle.alert.metadata;
import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.ScheduleState;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -30,7 +31,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
public interface IMetadataDao extends Closeable {
@@ -68,7 +73,6 @@ public interface IMetadataDao extends Closeable {
OpResult addPublishment(Publishment publishment);
-
OpResult removePublishment(String pubId);
List<PublishmentType> listPublishmentType();
@@ -81,7 +85,7 @@ public interface IMetadataDao extends Closeable {
AlertPublishEvent getAlertPublishEvent(String alertId);
- List<AlertPublishEvent> getAlertPublishEventByPolicyId(String policyId, int size);
+ List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size);
OpResult addAlertPublishEvent(AlertPublishEvent event);
@@ -112,11 +116,78 @@ public interface IMetadataDao extends Closeable {
Logger LOG = LoggerFactory.getLogger(IMetadataDao.class);
- default PolicyDefinition getPolicyByID(String policyId) {
+ default PolicyDefinition getPolicyById(String policyId) {
Preconditions.checkNotNull(policyId,"policyId");
return listPolicies().stream().filter(pc -> pc.getName().equals(policyId)).findAny().orElseGet(() -> {
LOG.error("Policy (policyId " + policyId + ") not found");
throw new IllegalArgumentException("Policy (policyId " + policyId + ") not found");
});
}
+
+ default List<Publishment> getPublishmentsByPolicyId(String policyId) {
+ return listPublishment().stream().filter(ps ->
+ ps.getPolicyIds() != null && ps.getPolicyIds().contains(policyId)
+ ).collect(Collectors.toList());
+ }
+
+ default OpResult addPublishmentsToPolicy(String policyId, List<String> publishmentIds) {
+ OpResult result = new OpResult();
+ if (publishmentIds == null || publishmentIds.size() == 0) {
+ result.code = OpResult.FAILURE;
+ result.message = "Failed to add policy, there is no publisher in it";
+ return result;
+ }
+ try {
+ Map<String,Publishment> publishmentMap = new HashMap<>();
+ listPublishment().forEach((pub) -> publishmentMap.put(pub.getName(),pub));
+ for (String publishmentId : publishmentIds) {
+ if (publishmentMap.containsKey(publishmentId)) {
+ Publishment publishment = publishmentMap.get(publishmentId);
+ if (publishment.getPolicyIds() == null) {
+ publishment.setPolicyIds(new ArrayList<>());
+ }
+ if (publishment.getPolicyIds().contains(policyId)) {
+ LOG.warn("Policy {} was already bound with publisher {}",policyId, publishmentId);
+ } else {
+ publishment.getPolicyIds().add(policyId);
+ }
+ OpResult opResult = addPublishment(publishment);
+ if (opResult.code == OpResult.FAILURE) {
+ LOG.error("Failed to add publisher {} to policy {}: {}", publishmentId, policyId, opResult.message);
+ return opResult;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(opResult.message);
+ }
+ }
+ } else {
+ throw new IllegalArgumentException("Publishment (name: " + publishmentId + ") not found");
+ }
+ }
+
+ //for other publishments, remove policyId from them, work around, we should refactor
+ for (String publishmentId : publishmentMap.keySet()) {
+ if (publishmentIds.contains(publishmentId)) {
+ continue;
+ }
+ Publishment publishment = publishmentMap.get(publishmentId);
+ if (publishment.getPolicyIds() != null && publishment.getPolicyIds().contains(policyId)) {
+ publishment.getPolicyIds().remove(policyId);
+ OpResult opResult = addPublishment(publishment);
+ if (opResult.code == OpResult.FAILURE) {
+ LOG.error("Failed to delete policy {}, from publisher {}, {} ", policyId, publishmentId, opResult.message);
+ return opResult;
+ }
+ }
+ }
+ result.code = OpResult.SUCCESS;
+ result.message = "Successfully add " + publishmentIds.size() + " publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId;
+ LOG.info(result.message);
+ } catch (Exception ex) {
+ result.code = OpResult.FAILURE;
+ result.message = "Failed to add publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId + ", cause: " + ex.getMessage();
+ LOG.error(result.message,ex);
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
index adfe15a..2af49fb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
@@ -219,7 +219,7 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
}
@Override
- public List<AlertPublishEvent> getAlertPublishEventByPolicyId(String policyId, int size) {
+ public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) {
List<AlertPublishEvent> result = alerts.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList());
if (size < 0 || size > result.size()) {
size = result.size();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
deleted file mode 100644
index 933c02e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.metadata.impl;
-
-import org.apache.commons.dbcp.BasicDataSource;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.sql.DataSource;
-import java.io.IOException;
-import java.sql.*;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class JdbcDatabaseHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(JdbcDatabaseHandler.class);
-
- private static final String INSERT_STATEMENT = "INSERT INTO %s VALUES (?, ?)";
- private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE id=?";
- private static final String UPDATE_STATEMENT = "UPDATE %s set value=? WHERE id=?";
- private static final String QUERY_ALL_STATEMENT = "SELECT value FROM %s";
- private static final String QUERY_CONDITION_STATEMENT = "SELECT value FROM %s WHERE id=?";
- private static final String QUERY_ORDERBY_STATEMENT = "SELECT value FROM %s ORDER BY id %s";
- private static final String QUERY_ALL_STATEMENT_WITH_SIZE = "SELECT value FROM %s limit %s";
- private static final String CLEAR_SCHEDULESTATES_STATEMENT = "DELETE FROM schedule_state WHERE id NOT IN (SELECT id from (SELECT id FROM schedule_state ORDER BY id DESC limit ?) as states)";
-
- public enum SortType { DESC, ASC }
-
- private static Map<String, String> tblNameMap = new HashMap<>();
-
- private static final ObjectMapper mapper = new ObjectMapper();
- private DataSource dataSource;
-
- static {
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
- registerTableName(StreamingCluster.class.getSimpleName(), "cluster");
- registerTableName(StreamDefinition.class.getSimpleName(), "stream_schema");
- registerTableName(Kafka2TupleMetadata.class.getSimpleName(), "datasource");
- registerTableName(PolicyDefinition.class.getSimpleName(), "policy");
- registerTableName(Publishment.class.getSimpleName(), "publishment");
- registerTableName(PublishmentType.class.getSimpleName(), "publishment_type");
- registerTableName(ScheduleState.class.getSimpleName(), "schedule_state");
- registerTableName(PolicyAssignment.class.getSimpleName(), "assignment");
- registerTableName(Topology.class.getSimpleName(), "topology");
- registerTableName(AlertPublishEvent.class.getSimpleName(), "alert_event");
- }
-
- private static void registerTableName(String clzName, String tblName) {
- tblNameMap.put(clzName, tblName);
- }
-
- public JdbcDatabaseHandler(Config config) {
- // "jdbc:mysql://dbhost/database?" + "user=sqluser&password=sqluserpw"
- //this.tblNameMap = JdbcSchemaManager.tblNameMap;
- try {
- //JdbcSchemaManager.getInstance().init(config);
- BasicDataSource bDatasource = new BasicDataSource();
- bDatasource.setDriverClassName(config.getString(MetadataUtils.JDBC_DRIVER_PATH));
- if (config.hasPath(MetadataUtils.JDBC_USERNAME_PATH)) {
- bDatasource.setUsername(config.getString(MetadataUtils.JDBC_USERNAME_PATH));
- bDatasource.setPassword(config.getString(MetadataUtils.JDBC_PASSWORD_PATH));
- }
- bDatasource.setUrl(config.getString(MetadataUtils.JDBC_CONNECTION_PATH));
- if (config.hasPath(MetadataUtils.JDBC_CONNECTION_PROPERTIES_PATH)) {
- bDatasource.setConnectionProperties(config.getString(MetadataUtils.JDBC_CONNECTION_PROPERTIES_PATH));
- }
- this.dataSource = bDatasource;
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- private String getTableName(String clzName) {
- String tbl = tblNameMap.get(clzName);
- if (tbl != null) {
- return tbl;
- } else {
- return clzName;
- }
- }
-
- public <T> OpResult addOrReplace(String clzName, T t) {
- String tb = getTableName(clzName);
- OpResult result = new OpResult();
- PreparedStatement statement = null;
- Savepoint savepoint = null;
- String key = null;
- String value = null;
- Connection connection = null;
- try {
- connection = dataSource.getConnection();
- statement = connection.prepareStatement(String.format(INSERT_STATEMENT, tb));
- key = MetadataUtils.getKey(t);
- value = mapper.writeValueAsString(t);
-
- statement.setString(1, key);
- Clob clob = connection.createClob();
- clob.setString(1, value);
- statement.setClob(2, clob);
-
- connection.setAutoCommit(false);
- savepoint = connection.setSavepoint("insertEntity");
- int status = statement.executeUpdate();
- LOG.info("update {} entities", status);
- connection.commit();
- } catch (SQLException e) {
- LOG.error(e.getMessage(), e.getCause());
- if (connection != null) {
- LOG.info("Detected duplicated entity");
- try {
- connection.rollback(savepoint);
- update(tb, key, value);
- } catch (SQLException e1) {
- //e1.printStackTrace();
- LOG.warn("Rollback failed", e1);
- }
- }
- } catch (JsonProcessingException e) {
- LOG.error("Got JsonProcessingException: {}", e.getMessage(), e.getCause());
- result.code = OpResult.FAILURE;
- result.message = e.getMessage();
- } finally {
- if (statement != null) {
- try {
- statement.close();
- } catch (SQLException e) {
- LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
- }
- }
- }
- return result;
- }
-
- private <T> OpResult update(String tb, String key, String value) throws SQLException {
- OpResult result = new OpResult();
- PreparedStatement statement = null;
- Connection connection = null;
- try {
- connection = dataSource.getConnection();
- statement = connection.prepareStatement(String.format(UPDATE_STATEMENT, tb));
- Clob clob = connection.createClob();
- clob.setString(1, value);
- statement.setClob(1, clob);
- statement.setString(2, key);
-
- int status = statement.executeUpdate();
- LOG.info("update {} entities from table {}", status, tb);
- } catch (SQLException e) {
- LOG.error(e.getMessage(), e);
- result.code = OpResult.FAILURE;
- result.message = e.getMessage();
- } finally {
- if (statement != null) {
- statement.close();
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
- }
- }
- }
- return result;
- }
-
- public <T> List<T> list(Class<T> clz) {
- String tb = getTableName(clz.getSimpleName());
- String query = String.format(QUERY_ALL_STATEMENT, tb);
- return executeSelectStatement(clz, query);
- }
-
- public <T> List<T> listSubset(Class<T> clz, int size) {
- String tb = getTableName(clz.getSimpleName());
- String query = String.format(QUERY_ALL_STATEMENT_WITH_SIZE, tb, size);
- return executeSelectStatement(clz, query);
- }
-
- public <T> List<T> listOrderBy(Class<T> clz, String sortType) {
- String tb = getTableName(clz.getSimpleName());
- String query = String.format(QUERY_ORDERBY_STATEMENT, tb, sortType);
- return executeSelectStatement(clz, query);
- }
-
- public <T> T listWithFilter(String key, Class<T> clz) {
- return executeSelectByIdStatement(clz, key);
- }
-
- public <T> T executeSelectByIdStatement(Class<T> clz, String id) {
- String tb = getTableName(clz.getSimpleName());
- List<T> result = new LinkedList<>();
- Connection connection = null;
- try {
- connection = dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(QUERY_CONDITION_STATEMENT, tb));
- statement.setString(1, id);
- ResultSet rs = statement.executeQuery();
- while (rs.next()) {
- //String key = rs.getString(1);
- String json = rs.getString(1);
- try {
- T obj = mapper.readValue(json, clz);
- result.add(obj);
- } catch (IOException e) {
- LOG.error("deserialize config item failed!", e);
- }
- }
- rs.close();
- statement.close();
- } catch (SQLException e) {
- LOG.error(e.getMessage(), e);
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
- }
- }
- }
- if (result.isEmpty()) {
- return null;
- } else {
- return result.get(0);
- }
- }
-
- public <T> List<T> executeSelectStatement(Class<T> clz, String query) {
- String tb = getTableName(clz.getSimpleName());
- List<T> result = new LinkedList<>();
- Connection connection = null;
- try {
- connection = dataSource.getConnection();
- Statement statement = connection.createStatement();
- ResultSet rs = statement.executeQuery(query);
- while (rs.next()) {
- //String key = rs.getString(1);
- String json = rs.getString(1);
- try {
- T obj = mapper.readValue(json, clz);
- result.add(obj);
- } catch (IOException e) {
- LOG.error("deserialize config item failed!", e);
- }
- }
- rs.close();
- statement.close();
- } catch (SQLException e) {
- LOG.error(e.getMessage(), e);
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
- }
- }
- }
- return result;
- }
-
- public <T> OpResult remove(String clzName, String key) {
- String tb = getTableName(clzName);
- OpResult result = new OpResult();
- Connection connection = null;
- try {
- connection = dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb));
- statement.setString(1, key);
- int status = statement.executeUpdate();
- String msg = String.format("delete %s entities from table %s", status, tb);
- result.code = OpResult.SUCCESS;
- result.message = msg;
- statement.close();
- } catch (SQLException e) {
- result.code = OpResult.FAILURE;
- result.message = e.getMessage();
- LOG.error(e.getMessage(), e);
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
- }
- }
- }
- return result;
- }
-
- public void close() throws IOException {
- //JdbcSchemaManager.getInstance().shutdown();
- }
-
- public OpResult removeBatch(String clzName, List<String> keys) {
- String tb = getTableName(clzName);
- OpResult result = new OpResult();
- Connection connection = null;
- try {
- connection = dataSource.getConnection();
- connection.setAutoCommit(false);
- PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb));
- for (String key : keys) {
- statement.setString(1, key);
- statement.addBatch();
- }
- int[] num = statement.executeBatch();
- connection.commit();
- int sum = 0;
- for (int i : num) {
- sum += i;
- }
- String msg = String.format("delete %s records from table %s", sum, tb);
- result.code = OpResult.SUCCESS;
- result.message = msg;
- statement.close();
- } catch (SQLException e) {
- result.code = OpResult.FAILURE;
- result.message = e.getMessage();
- LOG.error(e.getMessage(), e);
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
- }
- }
- }
- return result;
- }
-
- public OpResult removeScheduleStates(int capacity) {
- OpResult result = new OpResult();
- Connection connection = null;
- try {
- connection = dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(CLEAR_SCHEDULESTATES_STATEMENT);
- statement.setInt(1, capacity);
- result.message = String.format("delete %d records from schedule_state", statement.executeUpdate());
- result.code = OpResult.SUCCESS;
- statement.close();
- } catch (SQLException e) {
- result.code = OpResult.FAILURE;
- result.message = e.getMessage();
- LOG.error(e.getMessage(), e);
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
- }
- }
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/30e35de6/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
index 384eddc..b522451 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
@@ -35,18 +35,17 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
/**
* @since May 26, 2016.
*/
public class JdbcMetadataDaoImpl implements IMetadataDao {
private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataDaoImpl.class);
- private JdbcDatabaseHandler handler;
+ private JdbcMetadataHandler handler;
@Inject
public JdbcMetadataDaoImpl(Config config) {
- handler = new JdbcDatabaseHandler(config.getConfig(MetadataUtils.META_DATA));
+ handler = new JdbcMetadataHandler(config.getConfig(MetadataUtils.META_DATA));
}
@Override
@@ -76,42 +75,49 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
@Override
public List<Publishment> listPublishment() {
- return handler.list(Publishment.class);
+ return handler.listPublishments();
}
@Override
public List<AlertPublishEvent> listAlertPublishEvent(int size) {
- List<AlertPublishEvent> result = handler.list(AlertPublishEvent.class);
- if (size < 0 || size > result.size()) {
- size = result.size();
+ if (size <= 0) {
+ LOG.info("Invalid parameter size <= 0");
+ return new ArrayList<>();
}
- return result.subList(result.size() - size, result.size());
+ return handler.listAlertEvents(null, null, size);
+ }
+
+ public PolicyDefinition getPolicyById(String policyId) {
+ return handler.queryById(PolicyDefinition.class, policyId);
+ }
+
+ public List<Publishment> getPublishmentsByPolicyId(String policyId) {
+ return handler.getPublishmentsByPolicyId(policyId);
}
@Override
public AlertPublishEvent getAlertPublishEvent(String alertId) {
- return handler.listWithFilter(alertId, AlertPublishEvent.class);
+ return handler.getAlertEventById(alertId, 1);
}
@Override
- public List<AlertPublishEvent> getAlertPublishEventByPolicyId(String policyId, int size) {
- List<AlertPublishEvent> alerts = handler.list(AlertPublishEvent.class);
- List<AlertPublishEvent> result = alerts.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList());
- if (size < 0 || size > result.size()) {
- size = result.size();
+ public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) {
+ if (size <= 0) {
+ LOG.info("Invalid parameter size <= 0");
+ return new ArrayList<>();
}
- return result.subList(result.size() - size, result.size());
+ return handler.getAlertEventByPolicyId(policyId, size);
}
@Override
public ScheduleState getScheduleState(String versionId) {
- return handler.listWithFilter(versionId, ScheduleState.class);
+ return handler.queryById(ScheduleState.class, versionId);
}
@Override
public ScheduleState getScheduleState() {
List<ScheduleState> scheduleStates =
- handler.listOrderBy(ScheduleState.class, JdbcDatabaseHandler.SortType.DESC.toString());
+ handler.list(ScheduleState.class, JdbcMetadataHandler.SortType.DESC);
if (scheduleStates.isEmpty()) {
return null;
} else {
@@ -146,7 +152,7 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
@Override
public OpResult addAlertPublishEvent(AlertPublishEvent event) {
- return handler.addOrReplace(AlertPublishEvent.class.getSimpleName(), event);
+ return handler.addAlertEvent(event);
}
@Override
@@ -170,6 +176,11 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
}
@Override
+ public OpResult addPublishmentsToPolicy(String policyId, List<String> publishmentIds) {
+ return handler.addPublishmentsToPolicy(policyId, publishmentIds);
+ }
+
+ @Override
public OpResult addScheduleState(ScheduleState state) {
return handler.addOrReplace(ScheduleState.class.getSimpleName(), state);
}
@@ -196,37 +207,37 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
@Override
public OpResult removeTopology(String topologyName) {
- return handler.remove(Topology.class.getSimpleName(), topologyName);
+ return handler.removeById(Topology.class.getSimpleName(), topologyName);
}
@Override
public OpResult removeCluster(String clusterId) {
- return handler.remove(StreamingCluster.class.getSimpleName(), clusterId);
+ return handler.removeById(StreamingCluster.class.getSimpleName(), clusterId);
}
@Override
public OpResult removeStream(String streamId) {
- return handler.remove(StreamDefinition.class.getSimpleName(), streamId);
+ return handler.removeById(StreamDefinition.class.getSimpleName(), streamId);
}
@Override
public OpResult removeDataSource(String datasourceId) {
- return handler.remove(Kafka2TupleMetadata.class.getSimpleName(), datasourceId);
+ return handler.removeById(Kafka2TupleMetadata.class.getSimpleName(), datasourceId);
}
@Override
public OpResult removePolicy(String policyId) {
- return handler.remove(PolicyDefinition.class.getSimpleName(), policyId);
+ return handler.removeById(PolicyDefinition.class.getSimpleName(), policyId);
}
@Override
public OpResult removePublishment(String pubId) {
- return handler.remove(Publishment.class.getSimpleName(), pubId);
+ return handler.removeById(Publishment.class.getSimpleName(), pubId);
}
@Override
public OpResult removePublishmentType(String pubType) {
- return handler.remove(PublishmentType.class.getSimpleName(), pubType);
+ return handler.removeById(PublishmentType.class.getSimpleName(), pubType);
}
@Override