You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/11/07 13:35:14 UTC
incubator-eagle git commit: [EAGLE-743] fix a bug in streamRouter
Repository: incubator-eagle
Updated Branches:
refs/heads/master 3f8a33d8b -> eaab4a9e0
[EAGLE-743] fix a bug in streamRouter
https://issues.apache.org/jira/browse/EAGLE-743
Author: Zhao, Qingwen <qi...@apache.org>
Closes #614 from qingwen220/EAGLE-743.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/eaab4a9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/eaab4a9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/eaab4a9e
Branch: refs/heads/master
Commit: eaab4a9e0e5ec6077059c26fbf136ecf6d84a99f
Parents: 3f8a33d
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Mon Nov 7 21:35:05 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Mon Nov 7 21:35:05 2016 +0800
----------------------------------------------------------------------
.../impl/ZKMetadataChangeNotifyService.java | 3 -
.../impl/StreamRouterBoltOutputCollector.java | 2 +-
.../alert/engine/runner/AlertPublisherBolt.java | 4 +
.../TestStreamRouterBoltOutputCollector.java | 171 +++++++++++++++++++
...ecurity.auditlog.HdfsAuditLogAppProvider.xml | 39 ++---
5 files changed, 189 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eaab4a9e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
index bd30b87..2ffccaa 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
@@ -127,9 +127,6 @@ public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyS
} else {
prePopulate(alertSpec, state.getPolicySnapshots());
notifyAlertBolt(alertSpec, sds);
- if (state.getPublishSpecs().get(topologyId) != null) {
- notifyAlertPublishBolt(listToMap(state.getPolicySnapshots()), sds);
- }
}
break;
case ALERT_PUBLISH_BOLT:
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eaab4a9e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
index 11d4d9e..3a53b44 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
@@ -161,7 +161,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
// modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed
for (StreamRouterSpec spec : modified) {
if (!copyRouteSpecMap.containsKey(spec.getPartition())
- || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
+ || copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec);
} else {
inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eaab4a9e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 9746ea4..323f682 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -126,6 +126,10 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
private void wrapAlertPublishEvent(AlertStreamEvent event) {
Map<String, Object> extraData = new HashedMap();
List<String> appIds = new ArrayList<>();
+ if (policyDefinitionMap == null || streamDefinitionMap == null) {
+ LOG.warn("policyDefinitions or streamDefinitions in publisher bolt have not been initialized");
+ return;
+ }
PolicyDefinition policyDefinition = policyDefinitionMap.get(event.getPolicyId());
if (this.policyDefinitionMap != null && policyDefinition != null) {
for (String inputStreamId : policyDefinition.getInputStreams()) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eaab4a9e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
new file mode 100644
index 0000000..fd8ad61
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.router;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.StreamContextImpl;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.text.ParseException;
+import java.util.*;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+public class TestStreamRouterBoltOutputCollector {
+
+ @Test
+ public void testStreamRouterCollector() throws ParseException {
+ String streamId = "HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX";
+ StreamPartition partition = new StreamPartition();
+ partition.setStreamId(streamId);
+ partition.setType(StreamPartition.Type.GROUPBY);
+ partition.setColumns(new ArrayList<String>(){{
+ add("col1");
+ }});
+
+ // begin to create two router specs
+ WorkSlot worker1 = new WorkSlot("ALERT_UNIT_TOPOLOGY_APP_SANDBOX", "alertBolt1");
+ WorkSlot worker2 = new WorkSlot("ALERT_UNIT_TOPOLOGY_APP_SANDBOX", "alertBolt2");
+
+ PolicyWorkerQueue queue1 = new PolicyWorkerQueue();
+ queue1.setPartition(partition);
+ queue1.setWorkers(new ArrayList<WorkSlot>(){ {
+ add(worker1);
+ }} );
+
+ PolicyWorkerQueue queue2 = new PolicyWorkerQueue();
+ queue2.setPartition(partition);
+ queue2.setWorkers(new ArrayList<WorkSlot>(){ {
+ add(worker1);
+ add(worker2);
+ }} );
+
+ StreamRouterSpec spec1 = new StreamRouterSpec();
+ spec1.setStreamId(streamId);
+ spec1.setPartition(partition);
+
+ spec1.setTargetQueue(new ArrayList<PolicyWorkerQueue>(){{
+ add(queue1);
+ }});
+
+ StreamRouterSpec spec2 = new StreamRouterSpec();
+ spec2.setStreamId(streamId);
+ spec2.setPartition(partition);
+
+ spec2.setTargetQueue(new ArrayList<PolicyWorkerQueue>(){{
+ add(queue2);
+ }});
+
+ // the end of creating
+
+ List<String> targetStreamIds = new ArrayList<>();
+ IOutputCollector delegate = new IOutputCollector() {
+
+ @Override
+ public void reportError(Throwable error) {
+
+ }
+
+ @Override
+ public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ targetStreamIds.add(streamId);
+ return null;
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ }
+
+ @Override
+ public void ack(Tuple input) {
+ }
+
+ @Override
+ public void fail(Tuple input) {
+ }
+
+ };
+
+ List<StreamRouterSpec> list1 = new ArrayList<>();
+ list1.add(spec1);
+
+ List<StreamRouterSpec> list2 = new ArrayList<>();
+ list2.add(spec2);
+
+ // construct StreamDefinition
+ StreamDefinition schema = new StreamDefinition();
+ schema.setStreamId(streamId);
+ StreamColumn column = new StreamColumn();
+ column.setName("col1");
+ column.setType(StreamColumn.Type.STRING);
+ schema.setColumns(Collections.singletonList(column));
+ Map<String, StreamDefinition> sds = new HashMap<>();
+ sds.put(schema.getStreamId(), schema);
+
+ // create two events
+ StreamEvent event1 = new StreamEvent();
+ Object[] data = new Object[] {"value1"};
+ event1.setData(data);
+ event1.setStreamId(streamId);
+ PartitionedEvent pEvent1 = new PartitionedEvent();
+ pEvent1.setEvent(event1);
+ pEvent1.setPartition(partition);
+
+ StreamEvent event2 = new StreamEvent();
+ Object[] data2 = new Object[] {"value3"};
+ event2.setData(data2);
+ event2.setStreamId(streamId);
+ PartitionedEvent pEvent2 = new PartitionedEvent();
+ pEvent2.setEvent(event2);
+ pEvent2.setPartition(partition);
+
+ TopologyContext context = Mockito.mock(TopologyContext.class);
+ when(context.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
+ StreamContext streamContext = new StreamContextImpl(null, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context);
+ StreamRouterBoltOutputCollector collector = new StreamRouterBoltOutputCollector("test", new OutputCollector(delegate), null, streamContext, null);
+
+ // add a StreamRouterSpec which has one worker
+ collector.onStreamRouterSpecChange(list1, new ArrayList<>(), new ArrayList<>(), sds);
+ collector.emit(pEvent1);
+ Assert.assertTrue(targetStreamIds.size() == 1);
+
+ // modify the StreamRouterSpec to contain two workers
+ collector.onStreamRouterSpecChange(new ArrayList<>(), new ArrayList<>(), list2, sds);
+ collector.emit(pEvent2);
+ Assert.assertTrue(targetStreamIds.size() == 2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eaab4a9e/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 8df96df..801a183 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -201,34 +201,21 @@
</streams>
<docs>
<install>
-# Step 1: Create source kafka topic named "${site}_example_source_topic"
-
-./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
-
-# Step 2: Set up data collector to flow data into kafka topic in
-
-./bin/logstash -f log_collector.conf
-
-## `log_collector.conf` sample as following:
-
-input {
-
-}
-filter {
-
-}
-output{
-
-}
-
-# Step 3: start application
-
-# Step 4: monitor with featured portal or alert with policies
+ <b>How to Install</b>
+ <ol>
+ <li>Create three kafka topics: <code>hdfs_audit_log_{SITE_ID}, hdfs_audit_log_enriched_{SITE_ID}, hdfs_audit_log_alert_{SITE_ID}</code></li>
+ <li>Setup a log collecting tool you like to stream audit log into topic <code>hdfs_audit_log_{SITE_ID}</code></li>
+ <li>Click "Install" button and edit configurations in general and advanced lists according to your requirements </li>
+ <li>Check the new generated stream <code>HDFS_AUDIT_LOG_ENRICHED_STREAM_{SITE_ID}</code> at Alert -> Streams</li>
+ </ol>
</install>
<uninstall>
-# Step 1: stop and uninstall application
-# Step 2: delete kafka topic named "${site}_example_source_topic"
-# Step 3: stop logstash
+ <b>How to Uninstall</b>
+ <ol>
+ <li>Click "Stop" button to stop the running application</li>
+ <li>Remove three kafka topics</li>
+ <li>Click "Uninstall" button which will remove stream <code>HDFS_AUDIT_LOG_ENRICHED_STREAM_{SITE_ID}</code></li>
+ </ol>
</uninstall>
</docs>
</application>