You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/10/13 09:13:56 UTC

incubator-eagle git commit: EAGLE-594: Remove raw alert from kafka publisher

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 1d0f9f5b0 -> a3df07437


EAGLE-594: Remove raw alert from kafka publisher

We are leveraging configured deduplicator to dedup the duplicated alerts before
publish to kafka, email, slack, etc. However, sometimes we may still want to
keep the raw alerts in kafka. Here we have defined rawAlertNamespaceLabel and
rawAlertNamespaceValue custom fields to emit the raw alerts into kafka.
However, these configured namespace concept is ebay/sherlock specific, we
should remove it from eagle and use it ebay/sherlock extended kafka publisher.

Author: Li, Garrett
Reviewer: ralphsu

This closes #478


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/a3df0743
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/a3df0743
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/a3df0743

Branch: refs/heads/master
Commit: a3df074379b3952c366e1d9d876d0223ca24ffed
Parents: 1d0f9f5
Author: Xiancheng Li <xi...@ebay.com>
Authored: Sun Oct 9 10:37:06 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Thu Oct 13 16:51:23 2016 +0800

----------------------------------------------------------------------
 .../engine/publisher/PublishConstants.java      |  3 -
 .../publisher/impl/AlertKafkaPublisher.java     | 90 ++++++++++----------
 2 files changed, 44 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a3df0743/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
index 7408779..0ba2313 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
@@ -51,7 +51,4 @@ public class PublishConstants {
     public static final String ALERT_EMAIL_POLICY = "policyId";
     public static final String ALERT_EMAIL_CREATOR = "creator";
     
-    public static final String RAW_ALERT_NAMESPACE_LABEL = "rawAlertNamespaceLabel";
-    public static final String RAW_ALERT_NAMESPACE_VALUE = "rawAlertNamespaceValue";
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a3df0743/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index 5464ded..5df34e3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,8 +44,6 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
     private KafkaProducer producer;
     private String brokerList;
     private String topic;
-    private String namespaceLabel;
-    private String namespaceValue;
 
     @Override
     @SuppressWarnings("rawtypes")
@@ -58,42 +55,64 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
             brokerList = kafkaConfig.get(PublishConstants.BROKER_LIST).trim();
             producer = KafkaProducerManager.INSTANCE.getProducer(brokerList, kafkaConfig);
             topic = kafkaConfig.get(PublishConstants.TOPIC).trim();
-            namespaceLabel = kafkaConfig.getOrDefault(PublishConstants.RAW_ALERT_NAMESPACE_LABEL, "namespace");
-            namespaceValue = kafkaConfig.getOrDefault(PublishConstants.RAW_ALERT_NAMESPACE_VALUE, "network");
         }
     }
 
-    @SuppressWarnings( {"unchecked", "rawtypes"})
     @Override
     public void onAlert(AlertStreamEvent event) throws Exception {
         if (producer == null) {
             LOG.warn("KafkaProducer is null due to the incorrect configurations");
             return;
         }
-        List<AlertStreamEvent> outputEvents = new ArrayList<AlertStreamEvent>();
 
-        int namespaceColumnIndex = event.getSchema().getColumnIndex(namespaceLabel);
-        if (namespaceColumnIndex < 0 || namespaceColumnIndex >= event.getData().length) {
-            LOG.warn("Namespace column {} is not found, the found index {} is invalid",
-                namespaceLabel, namespaceColumnIndex);
-        } else {
-            // copy raw event to be duped
-            AlertStreamEvent newEvent = new AlertStreamEvent(event);
-            newEvent.getData()[namespaceColumnIndex] = namespaceValue;
-            outputEvents.add(newEvent);
+        this.emit(this.topic, this.dedup(event));
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
+        deduplicator.setDedupIntervalMin(dedupIntervalMin);
+        String newBrokerList = pluginProperties.get(PublishConstants.BROKER_LIST).trim();
+        String newTopic = pluginProperties.get(PublishConstants.TOPIC).trim();
+        if (!newBrokerList.equals(this.brokerList)) {
+            producer.close();
+            brokerList = newBrokerList;
+            KafkaProducer newProducer = null;
+            try {
+                newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList, pluginProperties);
+            } catch (Exception e) {
+                LOG.error("Create KafkaProducer failed with configurations: {}", pluginProperties);
+            }
+            producer = newProducer;
         }
+        topic = newTopic;
+    }
 
-        List<AlertStreamEvent> dedupResults = dedup(event);
-        if (dedupResults != null) {
-            outputEvents.addAll(dedupResults);
+    @Override
+    public void close() {
+        producer.close();
+    }
+
+    @SuppressWarnings( {"rawtypes", "unchecked"})
+    protected PublishStatus emit(String topic, List<AlertStreamEvent> outputEvents) {
+        // we need to check producer here since the producer is invisable to extended kafka publisher
+        if (producer == null) {
+            LOG.warn("KafkaProducer is null due to the incorrect configurations");
+            return null;
         }
-        PublishStatus status = new PublishStatus();
+        if (outputEvents == null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Alert stream events list in publishment is empty");
+            }
+            return null;
+        }
+        this.status = new PublishStatus();
         try {
             for (AlertStreamEvent outputEvent : outputEvents) {
                 ProducerRecord record = createRecord(outputEvent, topic);
                 if (record == null) {
-                    LOG.error(" Alert serialize return null, ignored message! ");
-                    return;
+                    LOG.error("Alert serialize return null, ignored message! ");
+                    return null;
                 }
                 Future<?> future = producer.send(record);
                 future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
@@ -112,32 +131,11 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
             status.successful = false;
             status.errorMessage = ex.getMessage();
         }
-        this.status = status;
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
-        deduplicator.setDedupIntervalMin(dedupIntervalMin);
-        String newBrokerList = pluginProperties.get(PublishConstants.BROKER_LIST).trim();
-        String newTopic = pluginProperties.get(PublishConstants.TOPIC).trim();
-        if (!newBrokerList.equals(this.brokerList)) {
-            producer.close();
-            brokerList = newBrokerList;
-            KafkaProducer newProducer = null;
-            try {
-                newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList, pluginProperties);
-            } catch (Exception e) {
-                LOG.error("Create KafkaProducer failed with configurations: {}", pluginProperties);
-            }
-            producer = newProducer;
-        }
-        topic = newTopic;
+        return status;
     }
 
-    @Override
-    public void close() {
-        producer.close();
+    protected String getTopic() {
+        return this.topic;
     }
 
     private ProducerRecord<String, Object> createRecord(AlertStreamEvent event, String topic) throws Exception {