You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/18 21:20:54 UTC
[1/2] incubator-eagle git commit: EAGLE-475 Fix generic email
publisher and publish emails for absence alert. Fix generic email publisher
and publish emails for absence alert.
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 6fc103338 -> 5bb91c4bd
EAGLE-475 Fix generic email publisher and publish emails for absence alert.
Fix generic email publisher and publish emails for absence alert.
https://issues.apache.org/jira/browse/EAGLE-475
Author: @pkuwm <ih...@gmail.com>
Reviewer: @yonzhang <yo...@apache.org>
Closes: 359
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/076f3a49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/076f3a49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/076f3a49
Branch: refs/heads/develop
Commit: 076f3a491ae656bb9e938643ea4c0d5672bc5e04
Parents: 6fc1033
Author: yonzhang <yo...@gmail.com>
Authored: Thu Aug 18 14:01:06 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Thu Aug 18 14:01:06 2016 -0700
----------------------------------------------------------------------
.../evaluator/absence/AbsenceAlertDriver.java | 16 +-
.../evaluator/absence/AbsencePolicyHandler.java | 16 +-
.../publisher/email/AlertEmailConstants.java | 19 +-
.../publisher/email/AlertEmailSender.java | 8 +-
.../engine/publisher/email/EagleMailClient.java | 55 ++--
.../eagle/alert/engine/runner/AlertBolt.java | 9 +-
.../alert/engine/utils/AlertStreamUtils.java | 42 +++
.../alert/engine/utils/MetadataSerDeser.java | 15 +-
.../eagle/common/email/EagleMailClient.java | 253 -------------------
9 files changed, 117 insertions(+), 316 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
index bf142cd..3b4aba8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
@@ -31,12 +31,12 @@ public class AbsenceAlertDriver {
private AbsenceWindowProcessor processor;
private AbsenceWindowGenerator windowGenerator;
- public AbsenceAlertDriver(List<Object> expectedAttrs, AbsenceWindowGenerator windowGenerator){
+ public AbsenceAlertDriver(List<Object> expectedAttrs, AbsenceWindowGenerator windowGenerator) {
this.expectedAttrs = expectedAttrs;
this.windowGenerator = windowGenerator;
}
- public void process(List<Object> appearAttrs, long occurTime){
+ public boolean process(List<Object> appearAttrs, long occurTime) {
// initialize window
if(processor == null){
processor = nextProcessor(occurTime);
@@ -45,15 +45,21 @@ public class AbsenceAlertDriver {
processor.process(appearAttrs, occurTime);
AbsenceWindowProcessor.OccurStatus status = processor.checkStatus();
boolean expired = processor.checkExpired();
- if(expired){
+ boolean isAbsenceAlert = false;
+ if (expired){
if(status == AbsenceWindowProcessor.OccurStatus.absent){
// send alert
- LOG.info("this is an alert");
+ LOG.info("===================");
+ LOG.info("|| Absence Alert ||");
+ LOG.info("===================");
+ isAbsenceAlert = true;
// figure out next window and set the new window
}
processor = nextProcessor(occurTime);
LOG.info("created a new window {}", processor);
}
+
+ return isAbsenceAlert;
}
/**
@@ -61,7 +67,7 @@ public class AbsenceAlertDriver {
* @param currTime milliseconds
* @return
*/
- private AbsenceWindowProcessor nextProcessor(long currTime){
+ private AbsenceWindowProcessor nextProcessor(long currTime) {
AbsenceWindow window = windowGenerator.nextWindow(currTime);
return new AbsenceWindowProcessor(expectedAttrs, window);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
index 0a07a27..826a69d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
@@ -23,6 +23,7 @@ import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.utils.AlertStreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,15 +81,16 @@ public class AbsencePolicyHandler implements PolicyStreamHandler {
throw new IllegalArgumentException("policy inputStream size has to be 1 for absence alert");
// validate outputStream has to contain only one stream
if(policyDef.getOutputStreams().size() != 1)
- throw new IllegalArgumentException("policy outputStream size has to be 1 for absense alert");
+ throw new IllegalArgumentException("policy outputStream size has to be 1 for absence alert");
String is = inputStreams.get(0);
StreamDefinition sd = sds.get(is);
String policyValue = policyDef.getDefinition().getValue();
- // assume that absence alert policy value consists of "numOfFields, f1_name, f2_name, f1_value, f2_value, absence_window_rule_type, startTimeOffset, endTimeOffset}
- String[] segments = policyValue.split(",");
+ // Assume that absence alert policy value consists of
+ // "numOfFields, f1_name, f2_name, f1_value, f2_value, absence_window_rule_type, startTimeOffset, endTimeOffset"
+ String[] segments = policyValue.split(",\\s*");
int offset = 0;
// populate wisb field names
int numOfFields = Integer.parseInt(segments[offset++]);
@@ -124,7 +126,13 @@ public class AbsencePolicyHandler implements PolicyStreamHandler {
columnValues.add(o.toString());
}
- driver.process(columnValues, event.getTimestamp());
+ boolean isAbsenceAlert = driver.process(columnValues, event.getTimestamp());
+
+ // Publishing alerts.
+ if (isAbsenceAlert) {
+ AlertStreamEvent alertEvent = AlertStreamUtils.createAlertEvent(event, context, sds);
+ collector.emit(alertEvent);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
index 7d31bb5..9010c93 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
@@ -20,20 +20,15 @@ package org.apache.eagle.alert.engine.publisher.email;
public class AlertEmailConstants {
- public static final String MAIL_AUTH = "mail.smtp.auth";
- public static final String MAIL_HOST = "mail.smtp.host";
- public static final String MAIL_PORT = "mail.smtp.port";
-
public static final String CONN_PLAINTEXT = "plaintext";
public static final String CONN_TLS = "tls";
public static final String CONN_SSL = "ssl";
- public static final String CONF_MAIL_SERVER = "smtp.server";
- public static final String CONF_MAIL_PORT = "smtp.port";
- public static final String CONF_MAIL_CONN = "connection";
- public static final String CONF_MAIL_DEBUG = "mailDebug";
- public static final String CONF_MAIL_AUTH = "smtp.auth.enable";
- public static final String CONF_AUTH_USER = "auth.username";
- public static final String CONF_AUTH_PASSWORD = "auth.password";
-
+ public static final String CONF_MAIL_HOST = "mail.smtp.host";
+ public static final String CONF_MAIL_PORT = "mail.smtp.port";
+ public static final String CONF_MAIL_AUTH = "mail.smtp.auth";
+ public static final String CONF_AUTH_USER = "mail.username";
+ public static final String CONF_AUTH_PASSWORD = "mail.password";
+ public static final String CONF_MAIL_CONN = "mail.connection";
+ public static final String CONF_MAIL_DEBUG = "mail.debug";
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
index d0e5cf6..4091457 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
@@ -80,17 +80,17 @@ public class AlertEmailSender implements Runnable {
private Properties parseMailClientConfig(Map<String, String> mailProps) {
if (mailProps == null) return null;
Properties props = new Properties();
- String mailHost = mailProps.get(AlertEmailConstants.CONF_MAIL_SERVER);
+ String mailHost = mailProps.get(AlertEmailConstants.CONF_MAIL_HOST);
String mailPort = mailProps.get(AlertEmailConstants.CONF_MAIL_PORT);
if (mailHost == null || mailPort == null || mailHost.isEmpty()) {
LOG.warn("SMTP server is unset, will exit");
return null;
}
- props.put(AlertEmailConstants.MAIL_HOST, mailHost);
- props.put(AlertEmailConstants.MAIL_PORT, mailPort);
+ props.put(AlertEmailConstants.CONF_MAIL_HOST, mailHost);
+ props.put(AlertEmailConstants.CONF_MAIL_PORT, mailPort);
String smtpAuth = mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_AUTH, "false");
- props.put(AlertEmailConstants.MAIL_AUTH, smtpAuth);
+ props.put(AlertEmailConstants.CONF_MAIL_AUTH, smtpAuth);
if (Boolean.parseBoolean(smtpAuth)) {
props.put(AlertEmailConstants.CONF_AUTH_USER, mailProps.get(AlertEmailConstants.CONF_AUTH_USER));
props.put(AlertEmailConstants.CONF_AUTH_PASSWORD, mailProps.get(AlertEmailConstants.CONF_AUTH_PASSWORD));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
index 61194a2..21fe354 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
@@ -20,7 +20,6 @@ package org.apache.eagle.alert.engine.publisher.email;
import java.io.File;
import java.io.StringWriter;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -51,12 +50,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EagleMailClient {
-// private static final String CONFIG_FILE = "config.properties";
+ private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class);
private static final String BASE_PATH = "templates/";
private VelocityEngine velocityEngine;
private Session session;
- private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class);
public EagleMailClient() {
this(new Properties());
@@ -71,34 +69,37 @@ public class EagleMailClient {
config.put("mail.transport.protocol", "smtp");
if(Boolean.parseBoolean(config.getProperty(AlertEmailConstants.CONF_MAIL_AUTH))){
- session = Session.getDefaultInstance(config, new Authenticator() {
+ session = Session.getInstance(config, new Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
- return new PasswordAuthentication(config.getProperty(AlertEmailConstants.CONF_AUTH_USER), config.getProperty(AlertEmailConstants.CONF_AUTH_PASSWORD));
+ return new PasswordAuthentication(
+ config.getProperty(AlertEmailConstants.CONF_AUTH_USER),
+ config.getProperty(AlertEmailConstants.CONF_AUTH_PASSWORD)
+ );
}
});
+ } else {
+ session = Session.getInstance(config, new Authenticator() {});
}
- else session = Session.getDefaultInstance(config, new Authenticator() {});
- final String debugMode = config.getProperty(AlertEmailConstants.CONF_MAIL_DEBUG, "false");
- final boolean debug = Boolean.parseBoolean(debugMode);
+
+ final String debugMode = config.getProperty(AlertEmailConstants.CONF_MAIL_DEBUG, "false");
+ final boolean debug = Boolean.parseBoolean(debugMode);
+ LOG.info("Set email debug mode: " + debugMode);
session.setDebug(debug);
} catch (Exception e) {
- LOG.error("Failed connect to smtp server",e);
+ LOG.error("Failed to connect to smtp server", e);
}
}
- private boolean _send(String from, String to, String cc, String title,
- String content) {
+ private boolean _send(String from, String to, String cc, String title, String content) {
Message msg = new MimeMessage(session);
try {
msg.setFrom(new InternetAddress(from));
msg.setSubject(title);
if (to != null) {
- msg.setRecipients(Message.RecipientType.TO,
- InternetAddress.parse(to));
+ msg.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
}
if (cc != null) {
- msg.setRecipients(Message.RecipientType.CC,
- InternetAddress.parse(cc));
+ msg.setRecipients(Message.RecipientType.CC, InternetAddress.parse(cc));
}
//msg.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
msg.setContent(content, "text/html;charset=utf-8");
@@ -106,10 +107,10 @@ public class EagleMailClient {
Transport.send(msg);
return true;
} catch (AddressException e) {
- LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+ LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
return false;
} catch (MessagingException e) {
- LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+ LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
return false;
}
}
@@ -120,12 +121,10 @@ public class EagleMailClient {
mail.setFrom(new InternetAddress(from));
mail.setSubject(title);
if (to != null) {
- mail.setRecipients(Message.RecipientType.TO,
- InternetAddress.parse(to));
+ mail.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
}
if (cc != null) {
- mail.setRecipients(Message.RecipientType.CC,
- InternetAddress.parse(cc));
+ mail.setRecipients(Message.RecipientType.CC, InternetAddress.parse(cc));
}
//mail.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
@@ -146,10 +145,10 @@ public class EagleMailClient {
Transport.send(mail);
return true;
} catch (AddressException e) {
- LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+ LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
return false;
} catch (MessagingException e) {
- LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+ LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
return false;
}
}
@@ -176,6 +175,7 @@ public class EagleMailClient {
final StringWriter writer = new StringWriter();
t.merge(context, writer);
if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
+
return this.send(from, to, cc, title, writer.toString());
}
@@ -187,7 +187,6 @@ public class EagleMailClient {
Template t = null;
List<MimeBodyPart> mimeBodyParts = new ArrayList<MimeBodyPart>();
- Map<String,String> cid = new HashMap<String,String>();
for (Map.Entry<String,File> entry : attachments.entrySet()) {
final String attachment = entry.getKey();
@@ -200,7 +199,6 @@ public class EagleMailClient {
mimeBodyPart.setFileName(attachment);
mimeBodyPart.setDisposition(MimeBodyPart.ATTACHMENT);
mimeBodyPart.setContentID(attachment);
- cid.put(attachment,mimeBodyPart.getContentID());
mimeBodyParts.add(mimeBodyPart);
} catch (MessagingException e) {
LOG.error("Generate mail failed, got exception while attaching files: " + e.getMessage(), e);
@@ -209,9 +207,6 @@ public class EagleMailClient {
LOG.error("Attachment: " + attachment + " is null or not exists");
}
}
- //TODO remove cid, because not used at all
- if(LOG.isDebugEnabled()) LOG.debug("Cid maps: "+cid);
- context.put("cid", cid);
try {
t = velocityEngine.getTemplate(BASE_PATH + templatePath);
@@ -225,8 +220,7 @@ public class EagleMailClient {
} catch (ResourceNotFoundException e) {
try {
t = velocityEngine.getTemplate("/" + templatePath);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
LOG.error("Template not found:"+ "/" + templatePath, ex);
}
}
@@ -235,6 +229,7 @@ public class EagleMailClient {
final StringWriter writer = new StringWriter();
t.merge(context, writer);
if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
+
return this._send(from, to, cc, title, writer.toString(), mimeBodyParts);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index f653c9c..86c8b3d 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -111,9 +111,12 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
pe.getEvent().setMetaVersion(specVersion);
}
// check if specVersion is older than stream_event_version
- else if (specVersion != null && stream_event_version != null && specVersion.contains("spec_version_") && specVersion.contains("spec_version_")){
- Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]);
- Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]);
+ else if (specVersion != null && stream_event_version != null &&
+ specVersion.contains("spec_version_") && stream_event_version.contains("spec_version_")){
+// Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]);
+// Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]);
+ Long timestamp_of_specVersion = Long.valueOf(specVersion.substring(13));
+ Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.substring(13));
specVersionOutofdate = timestamp_of_specVersion < timestamp_of_streamEventVersion;
if (!specVersionOutofdate){
pe.getEvent().setMetaVersion(specVersion);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java
new file mode 100644
index 0000000..7e2941f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java
@@ -0,0 +1,42 @@
+package org.apache.eagle.alert.engine.utils;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+import java.util.Map;
+
+/**
+ * Created on 8/16/16.
+ */
+public class AlertStreamUtils {
+
+ /**
+ * Create alert stream event for publisher.
+ */
+ public static AlertStreamEvent createAlertEvent(StreamEvent event,
+ PolicyHandlerContext context,
+ Map<String, StreamDefinition> sds) {
+ PolicyDefinition policyDef = context.getPolicyDefinition();
+ AlertStreamEvent alertStreamEvent = new AlertStreamEvent();
+
+ alertStreamEvent.setTimestamp(event.getTimestamp());
+ alertStreamEvent.setData(event.getData());
+ alertStreamEvent.setStreamId(policyDef.getOutputStreams().get(0));
+ alertStreamEvent.setPolicy(policyDef);
+
+ if (context.getPolicyEvaluator() != null) {
+ alertStreamEvent.setCreatedBy(context.getPolicyEvaluator().getName());
+ }
+
+ alertStreamEvent.setCreatedTime(System.currentTimeMillis());
+
+ String is = policyDef.getInputStreams().get(0);
+ StreamDefinition sd = sds.get(is);
+ alertStreamEvent.setSchema(sd);
+
+ return alertStreamEvent;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
index a576404..1060d32 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
@@ -40,7 +40,8 @@ public class MetadataSerDeser {
K spec = mapper.readValue(is, typeRef);
return spec;
}catch(Exception ex){
- LOG.error("error in deserializing metadata of type {} from input stream", new TypeReference<K>(){}.getType().getTypeName(), ex);
+ LOG.error("error in deserializing metadata of type {} from input stream",
+ new TypeReference<K>(){}.getType().getClass().getCanonicalName(), ex);
}
return null;
}
@@ -52,7 +53,8 @@ public class MetadataSerDeser {
K spec = mapper.readValue(is, cls);
return spec;
}catch(Exception ex){
- LOG.error("Got error to deserialize metadata of type {} from input stream", new TypeReference<K>(){}.getType().getTypeName(), ex);
+ LOG.error("Got error to deserialize metadata of type {} from input stream",
+ new TypeReference<K>(){}.getType().getClass().getCanonicalName(), ex);
}
return null;
}
@@ -64,7 +66,8 @@ public class MetadataSerDeser {
K spec = mapper.readValue(json, typeRef);
return spec;
}catch(Exception ex){
- LOG.error("error in deserializing metadata of type {} from {}", new TypeReference<K>(){}.getType().getTypeName(), json, ex);
+ LOG.error("error in deserializing metadata of type {} from {}",
+ new TypeReference<K>(){}.getType().getClass().getCanonicalName(), json, ex);
}
return null;
}
@@ -75,7 +78,8 @@ public class MetadataSerDeser {
K spec = mapper.readValue(json, cls);
return spec;
}catch(Exception ex){
- LOG.error("error in deserializing metadata of type {} from {}", new TypeReference<K>(){}.getType().getTypeName(), json, ex);
+ LOG.error("error in deserializing metadata of type {} from {}",
+ new TypeReference<K>(){}.getType().getClass().getCanonicalName(), json, ex);
}
return null;
}
@@ -86,7 +90,8 @@ public class MetadataSerDeser {
String json = mapper.writeValueAsString(spec);
return json;
}catch(Exception ex){
- LOG.error("error in serializing object {} with type {}", spec, new TypeReference<K>(){}.getType().getTypeName(), ex);
+ LOG.error("error in serializing object {} with type {}", spec,
+ new TypeReference<K>(){}.getType().getClass().getCanonicalName(), ex);
}
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/email/EagleMailClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/email/EagleMailClient.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/email/EagleMailClient.java
deleted file mode 100755
index 6edac0a..0000000
--- a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/email/EagleMailClient.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.common.email;
-
-import java.io.File;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import javax.activation.DataHandler;
-import javax.activation.DataSource;
-import javax.activation.FileDataSource;
-import javax.mail.Authenticator;
-import javax.mail.Message;
-import javax.mail.MessagingException;
-import javax.mail.Multipart;
-import javax.mail.PasswordAuthentication;
-import javax.mail.Session;
-import javax.mail.Transport;
-import javax.mail.internet.AddressException;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeBodyPart;
-import javax.mail.internet.MimeMessage;
-import javax.mail.internet.MimeMultipart;
-
-import org.apache.commons.configuration.AbstractConfiguration;
-import org.apache.velocity.Template;
-import org.apache.velocity.VelocityContext;
-import org.apache.velocity.app.VelocityEngine;
-import org.apache.velocity.exception.ResourceNotFoundException;
-import org.apache.velocity.runtime.RuntimeConstants;
-import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.netflix.config.ConcurrentMapConfiguration;
-
-public class EagleMailClient {
-// private static final String CONFIG_FILE = "config.properties";
- private static final String BASE_PATH = "templates/";
- private static final String AUTH_CONFIG = "mail.smtp.auth";
- private static final String DEBUG_CONFIG = "mail.debug";
- private static final String USER_CONFIG = "mail.user";
- private static final String PASSWORD_CONFIG = "mail.password";
-
- private VelocityEngine velocityEngine;
- private Session session;
- private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class);
-
- public EagleMailClient() {
- this(new ConcurrentMapConfiguration());
- }
-
- public EagleMailClient(AbstractConfiguration configuration) {
- try {
- ConcurrentMapConfiguration con = (ConcurrentMapConfiguration)configuration;
- velocityEngine = new VelocityEngine();
- velocityEngine.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
- velocityEngine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
- velocityEngine.init();
-
- con.setProperty("mail.transport.protocol", "smtp");
- final Properties config = con.getProperties();
- if(Boolean.parseBoolean(config.getProperty(AUTH_CONFIG))){
- session = Session.getDefaultInstance(config, new Authenticator() {
- protected PasswordAuthentication getPasswordAuthentication() {
- return new PasswordAuthentication(config.getProperty(USER_CONFIG), config.getProperty(PASSWORD_CONFIG));
- }
- });
- }
- else session = Session.getDefaultInstance(config, new Authenticator() {});
- final String debugMode = config.getProperty(DEBUG_CONFIG, "false");
- final boolean debug = Boolean.parseBoolean(debugMode);
- session.setDebug(debug);
- } catch (Exception e) {
- LOG.error("Failed connect to smtp server",e);
- }
- }
-
- private boolean _send(String from, String to, String cc, String title,
- String content) {
- Message msg = new MimeMessage(session);
- try {
- msg.setFrom(new InternetAddress(from));
- msg.setSubject(title);
- if (to != null) {
- msg.setRecipients(Message.RecipientType.TO,
- InternetAddress.parse(to));
- }
- if (cc != null) {
- msg.setRecipients(Message.RecipientType.CC,
- InternetAddress.parse(cc));
- }
- //msg.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
- msg.setContent(content, "text/html;charset=utf-8");
- LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title));
-
- Transport.send(msg);
-
- return true;
- } catch (AddressException e) {
- LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
- return false;
- } catch (MessagingException e) {
- LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
- return false;
- }
- }
-
- private boolean _send(String from,String to,String cc,String title,String content,List<MimeBodyPart> attachments){
- MimeMessage mail = new MimeMessage(session);
- try {
- mail.setFrom(new InternetAddress(from));
- mail.setSubject(title);
- if (to != null) {
- mail.setRecipients(Message.RecipientType.TO,
- InternetAddress.parse(to));
- }
- if (cc != null) {
- mail.setRecipients(Message.RecipientType.CC,
- InternetAddress.parse(cc));
- }
-
- //mail.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
-
- MimeBodyPart mimeBodyPart = new MimeBodyPart();
- mimeBodyPart.setContent(content,"text/html;charset=utf-8");
-
- Multipart multipart = new MimeMultipart();
- multipart.addBodyPart(mimeBodyPart);
-
- for(MimeBodyPart attachment:attachments){
- multipart.addBodyPart(attachment);
- }
-
- mail.setContent(multipart);
-// mail.setContent(content, "text/html;charset=utf-8");
- LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title));
-
- Transport.send(mail);
-
- return true;
- } catch (AddressException e) {
- LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
- return false;
- } catch (MessagingException e) {
- LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
- return false;
- }
- }
-
- public boolean send(String from, String to, String cc, String title,
- String content) {
- return this._send(from, to, cc, title, content);
- }
-
- public boolean send(String from, String to, String cc, String title,
- String templatePath, VelocityContext context) {
- Template t = null;
- try {
- t = velocityEngine.getTemplate(BASE_PATH + templatePath);
- } catch (ResourceNotFoundException ex) {
-
- }
- if (t == null) {
- try {
- t = velocityEngine.getTemplate(templatePath);
- } catch (ResourceNotFoundException e) {
- t = velocityEngine.getTemplate("/" + templatePath);
- }
- }
- final StringWriter writer = new StringWriter();
- t.merge(context, writer);
- if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
- return this.send(from, to, cc, title, writer.toString());
- }
-
- public boolean send(String from, String to, String cc, String title,
- String templatePath, VelocityContext context, Map<String,File> attachments) {
- if (attachments == null || attachments.isEmpty()) {
- return send(from, to, cc, title, templatePath, context);
- }
- Template t = null;
-
- List<MimeBodyPart> mimeBodyParts = new ArrayList<MimeBodyPart>();
- Map<String,String> cid = new HashMap<String,String>();
-
- for (Map.Entry<String,File> entry : attachments.entrySet()) {
- final String attachment = entry.getKey();
- final File attachmentFile = entry.getValue();
- final MimeBodyPart mimeBodyPart = new MimeBodyPart();
- if(attachmentFile !=null && attachmentFile.exists()){
- DataSource source = new FileDataSource(attachmentFile);
- try {
- mimeBodyPart.setDataHandler(new DataHandler(source));
- mimeBodyPart.setFileName(attachment);
- mimeBodyPart.setDisposition(MimeBodyPart.ATTACHMENT);
- mimeBodyPart.setContentID(attachment);
- cid.put(attachment,mimeBodyPart.getContentID());
- mimeBodyParts.add(mimeBodyPart);
- } catch (MessagingException e) {
- LOG.error("Generate mail failed, got exception while attaching files: " + e.getMessage(), e);
- }
- }else{
- LOG.error("Attachment: " + attachment + " is null or not exists");
- }
- }
- //TODO remove cid, because not used at all
- if(LOG.isDebugEnabled()) LOG.debug("Cid maps: "+cid);
- context.put("cid", cid);
-
- try {
- t = velocityEngine.getTemplate(BASE_PATH + templatePath);
- } catch (ResourceNotFoundException ex) {
-// LOGGER.error("Template not found:"+BASE_PATH + templatePath, ex);
- }
-
- if (t == null) {
- try {
- t = velocityEngine.getTemplate(templatePath);
- } catch (ResourceNotFoundException e) {
- try {
- t = velocityEngine.getTemplate("/" + templatePath);
- }
- catch (Exception ex) {
- LOG.error("Template not found:"+ "/" + templatePath, ex);
- }
- }
- }
-
- final StringWriter writer = new StringWriter();
- t.merge(context, writer);
- if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
- return this._send(from, to, cc, title, writer.toString(), mimeBodyParts);
- }
-}
[2/2] incubator-eagle git commit: missed license header
Posted by yo...@apache.org.
missed license header
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/5bb91c4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/5bb91c4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/5bb91c4b
Branch: refs/heads/develop
Commit: 5bb91c4bd306ef6c612058dcbd88c14c29a49151
Parents: 076f3a4
Author: yonzhang <yo...@gmail.com>
Authored: Thu Aug 18 14:24:52 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Thu Aug 18 14:24:52 2016 -0700
----------------------------------------------------------------------
.../eagle/alert/engine/utils/AlertStreamUtils.java | 16 ++++++++++++++++
1 file changed, 16 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bb91c4b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java
index 7e2941f..0eaf065 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.eagle.alert.engine.utils;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;