You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/11/07 13:35:14 UTC

incubator-eagle git commit: [EAGLE-743] fix a bug in streamRouter

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 3f8a33d8b -> eaab4a9e0


[EAGLE-743] fix a bug in streamRouter

https://issues.apache.org/jira/browse/EAGLE-743

Author: Zhao, Qingwen <qi...@apache.org>

Closes #614 from qingwen220/EAGLE-743.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/eaab4a9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/eaab4a9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/eaab4a9e

Branch: refs/heads/master
Commit: eaab4a9e0e5ec6077059c26fbf136ecf6d84a99f
Parents: 3f8a33d
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Mon Nov 7 21:35:05 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Mon Nov 7 21:35:05 2016 +0800

----------------------------------------------------------------------
 .../impl/ZKMetadataChangeNotifyService.java     |   3 -
 .../impl/StreamRouterBoltOutputCollector.java   |   2 +-
 .../alert/engine/runner/AlertPublisherBolt.java |   4 +
 .../TestStreamRouterBoltOutputCollector.java    | 171 +++++++++++++++++++
 ...ecurity.auditlog.HdfsAuditLogAppProvider.xml |  39 ++---
 5 files changed, 189 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eaab4a9e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
index bd30b87..2ffccaa 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
@@ -127,9 +127,6 @@ public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyS
                 } else {
                     prePopulate(alertSpec, state.getPolicySnapshots());
                     notifyAlertBolt(alertSpec, sds);
-                    if (state.getPublishSpecs().get(topologyId) != null) {
-                        notifyAlertPublishBolt(listToMap(state.getPolicySnapshots()), sds);
-                    }
                 }
                 break;
             case ALERT_PUBLISH_BOLT:

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eaab4a9e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
index 11d4d9e..3a53b44 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
@@ -161,7 +161,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
         // modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed
         for (StreamRouterSpec spec : modified) {
             if (!copyRouteSpecMap.containsKey(spec.getPartition())
-                || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
+                || copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
                 LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec);
             } else {
                 inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eaab4a9e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 9746ea4..323f682 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -126,6 +126,10 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
     private void wrapAlertPublishEvent(AlertStreamEvent event) {
         Map<String, Object> extraData = new HashedMap();
         List<String> appIds = new ArrayList<>();
+        if (policyDefinitionMap == null || streamDefinitionMap == null) {
+            LOG.warn("policyDefinitions or streamDefinitions in publisher bolt have not been initialized");
+            return;
+        }
         PolicyDefinition policyDefinition = policyDefinitionMap.get(event.getPolicyId());
         if (this.policyDefinitionMap != null && policyDefinition != null) {
             for (String inputStreamId : policyDefinition.getInputStreams()) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eaab4a9e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
new file mode 100644
index 0000000..fd8ad61
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.router;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.StreamContextImpl;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.text.ParseException;
+import java.util.*;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+public class TestStreamRouterBoltOutputCollector {
+
+    @Test
+    public void testStreamRouterCollector() throws ParseException {
+        String streamId = "HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX";
+        StreamPartition partition = new StreamPartition();
+        partition.setStreamId(streamId);
+        partition.setType(StreamPartition.Type.GROUPBY);
+        partition.setColumns(new ArrayList<String>(){{
+            add("col1");
+        }});
+
+        // begin to create two router specs
+        WorkSlot worker1 = new WorkSlot("ALERT_UNIT_TOPOLOGY_APP_SANDBOX", "alertBolt1");
+        WorkSlot worker2 = new WorkSlot("ALERT_UNIT_TOPOLOGY_APP_SANDBOX", "alertBolt2");
+
+        PolicyWorkerQueue queue1 = new PolicyWorkerQueue();
+        queue1.setPartition(partition);
+        queue1.setWorkers(new ArrayList<WorkSlot>(){ {
+            add(worker1);
+        }} );
+
+        PolicyWorkerQueue queue2 = new PolicyWorkerQueue();
+        queue2.setPartition(partition);
+        queue2.setWorkers(new ArrayList<WorkSlot>(){ {
+            add(worker1);
+            add(worker2);
+        }} );
+
+        StreamRouterSpec spec1 = new StreamRouterSpec();
+        spec1.setStreamId(streamId);
+        spec1.setPartition(partition);
+
+        spec1.setTargetQueue(new ArrayList<PolicyWorkerQueue>(){{
+            add(queue1);
+        }});
+
+        StreamRouterSpec spec2 = new StreamRouterSpec();
+        spec2.setStreamId(streamId);
+        spec2.setPartition(partition);
+
+        spec2.setTargetQueue(new ArrayList<PolicyWorkerQueue>(){{
+            add(queue2);
+        }});
+
+        // the end of creating
+
+        List<String> targetStreamIds = new ArrayList<>();
+        IOutputCollector delegate = new IOutputCollector() {
+
+            @Override
+            public void reportError(Throwable error) {
+
+            }
+
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+                targetStreamIds.add(streamId);
+                return null;
+            }
+
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+            }
+
+            @Override
+            public void ack(Tuple input) {
+            }
+
+            @Override
+            public void fail(Tuple input) {
+            }
+
+        };
+
+        List<StreamRouterSpec> list1 = new ArrayList<>();
+        list1.add(spec1);
+
+        List<StreamRouterSpec> list2 = new ArrayList<>();
+        list2.add(spec2);
+
+        // construct StreamDefinition
+        StreamDefinition schema = new StreamDefinition();
+        schema.setStreamId(streamId);
+        StreamColumn column = new StreamColumn();
+        column.setName("col1");
+        column.setType(StreamColumn.Type.STRING);
+        schema.setColumns(Collections.singletonList(column));
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        sds.put(schema.getStreamId(), schema);
+
+        // create two events
+        StreamEvent event1 = new StreamEvent();
+        Object[] data = new Object[] {"value1"};
+        event1.setData(data);
+        event1.setStreamId(streamId);
+        PartitionedEvent pEvent1 = new PartitionedEvent();
+        pEvent1.setEvent(event1);
+        pEvent1.setPartition(partition);
+
+        StreamEvent event2 = new StreamEvent();
+        Object[] data2 = new Object[] {"value3"};
+        event2.setData(data2);
+        event2.setStreamId(streamId);
+        PartitionedEvent pEvent2 = new PartitionedEvent();
+        pEvent2.setEvent(event2);
+        pEvent2.setPartition(partition);
+
+        TopologyContext context = Mockito.mock(TopologyContext.class);
+        when(context.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
+        StreamContext streamContext = new StreamContextImpl(null, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context);
+        StreamRouterBoltOutputCollector collector = new StreamRouterBoltOutputCollector("test", new OutputCollector(delegate), null, streamContext, null);
+
+        // add a StreamRouterSpec which has one worker
+        collector.onStreamRouterSpecChange(list1, new ArrayList<>(), new ArrayList<>(), sds);
+        collector.emit(pEvent1);
+        Assert.assertTrue(targetStreamIds.size() == 1);
+
+        // modify the StreamRouterSpec to contain two workers
+        collector.onStreamRouterSpecChange(new ArrayList<>(), new ArrayList<>(), list2, sds);
+        collector.emit(pEvent2);
+        Assert.assertTrue(targetStreamIds.size() == 2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eaab4a9e/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 8df96df..801a183 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -201,34 +201,21 @@
     </streams>
     <docs>
         <install>
-# Step 1: Create source kafka topic named "${site}_example_source_topic"
-
-./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
-
-# Step 2: Set up data collector to flow data into kafka topic in
-
-./bin/logstash -f log_collector.conf
-
-## `log_collector.conf` sample as following:
-
-input {
-
-}
-filter {
-
-}
-output{
-
-}
-
-# Step 3: start application
-
-# Step 4: monitor with featured portal or alert with policies
+            <b>How to Install</b>
+            <ol>
+                <li>Create three kafka topics: <code>hdfs_audit_log_{SITE_ID}, hdfs_audit_log_enriched_{SITE_ID}, hdfs_audit_log_alert_{SITE_ID}</code></li>
+                <li>Setup a log collecting tool you like to stream audit log into topic <code>hdfs_audit_log_{SITE_ID}</code></li>
+                <li>Click "Install" button and edit configurations in general and advanced lists according to your requirements </li>
+                <li>Check the new generated stream <code>HDFS_AUDIT_LOG_ENRICHED_STREAM_{SITE_ID}</code> at Alert -> Streams</li>
+            </ol>
         </install>
         <uninstall>
-# Step 1: stop and uninstall application
-# Step 2: delete kafka topic named "${site}_example_source_topic"
-# Step 3: stop logstash
+            <b>How to Uninstall</b>
+            <ol>
+                <li>Click "Stop" button to stop the running application</li>
+                <li>Remove three kafka topics</li>
+                <li>Click "Uninstall" button which will remove stream <code>HDFS_AUDIT_LOG_ENRICHED_STREAM_{SITE_ID}</code></li>
+            </ol>
         </uninstall>
     </docs>
 </application>