You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2016/04/19 17:37:38 UTC
incubator-metron git commit: METRON-103 Unit Tests for storm bolts
(merrimanr) closes apache/incubator-metron#81
Repository: incubator-metron
Updated Branches:
refs/heads/master 2b67b1f28 -> 25e732661
METRON-103 Unit Tests for storm bolts (merrimanr) closes apache/incubator-metron#81
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/25e73266
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/25e73266
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/25e73266
Branch: refs/heads/master
Commit: 25e7326612c8a76a8cf18f9d9988457d860f4e5c
Parents: 2b67b1f
Author: merrimanr <me...@gmail.com>
Authored: Tue Apr 19 10:36:13 2016 -0500
Committer: rmerriman <rm...@hortonworks.com>
Committed: Tue Apr 19 10:36:13 2016 -0500
----------------------------------------------------------------------
deployment/roles/metron_ui/tasks/main.yml | 2 +-
metron-streaming/Metron-Common/pom.xml | 6 +
.../org/apache/metron/bolt/ConfiguredBolt.java | 53 +++--
.../org/apache/metron/bolt/BaseBoltTest.java | 48 -----
.../metron/bolt/BulkMessageWriterBoltTest.java | 149 ++++++++++++++
.../apache/metron/bolt/ConfiguredBoltTest.java | 27 ++-
.../org/apache/metron/bolt/JoinBoltTest.java | 120 ++++++++++++
.../org/apache/metron/bolt/SplitBoltTest.java | 124 ++++++++++++
.../metron/utils/ConfigurationsUtilsTest.java | 1 -
.../enrichment/bolt/EnrichmentJoinBolt.java | 7 -
.../enrichment/bolt/EnrichmentJoinBoltTest.java | 86 ++++++++
.../bolt/EnrichmentSplitterBoltTest.java | 97 +++++++++
.../bolt/GenericEnrichmentBoltTest.java | 195 +++++++++++++++++++
.../bolt/ThreatIntelJoinBoltTest.java | 105 ++++++++++
.../bolt/ThreatIntelSplitterBoltTest.java | 45 +++++
metron-streaming/Metron-MessageParsers/pom.xml | 6 +
.../org/apache/metron/bolt/ParserBoltTest.java | 90 +++++++++
.../Metron-TestingUtilities/pom.xml | 48 ++++-
.../org/apache/metron/bolt/BaseBoltTest.java | 93 +++++++++
.../metron/bolt/BaseEnrichmentBoltTest.java | 93 +++++++++
.../topologies/enrichment/remote.yaml | 6 -
.../topologies/enrichment/test.yaml | 6 -
22 files changed, 1309 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/deployment/roles/metron_ui/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_ui/tasks/main.yml b/deployment/roles/metron_ui/tasks/main.yml
index f4e0513..fd3422b 100644
--- a/deployment/roles/metron_ui/tasks/main.yml
+++ b/deployment/roles/metron_ui/tasks/main.yml
@@ -53,4 +53,4 @@
production: no
- name: Start Metron UI
- shell: "pm2 restart {{ metron_ui_directory }}/lib/metron-ui.js --name metron"
+ shell: "pm2 start {{ metron_ui_directory }}/lib/metron-ui.js --name metron"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
index 80b6a17..6985e6d 100644
--- a/metron-streaming/Metron-Common/pom.xml
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -194,6 +194,12 @@
<version>2.0.2</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-TestingUtilities</artifactId>
+ <version>0.1BETA</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<reporting>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
index 40a1f08..2a1cb13 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
@@ -42,39 +42,54 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
private String zookeeperUrl;
protected final Configurations configurations = new Configurations();
- private CuratorFramework client;
- private TreeCache cache;
+ protected CuratorFramework client;
+ protected TreeCache cache;
public ConfiguredBolt(String zookeeperUrl) {
this.zookeeperUrl = zookeeperUrl;
}
+ public Configurations getConfigurations() {
+ return configurations;
+ }
+
+ public void setCuratorFramework(CuratorFramework client) {
+ this.client = client;
+ }
+
+ public void setTreeCache(TreeCache cache) {
+ this.cache = cache;
+ }
+
protected void reloadCallback(String name, Configurations.Type type) {
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
try {
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+ if (client == null) {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+ }
client.start();
- cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
- TreeCacheListener listener = new TreeCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
- if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
- String path = event.getData().getPath();
- byte[] data = event.getData().getData();
- updateConfig(path, data);
+ if (cache == null) {
+ cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
+ TreeCacheListener listener = new TreeCacheListener() {
+ @Override
+ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+ if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
+ String path = event.getData().getPath();
+ byte[] data = event.getData().getData();
+ updateConfig(path, data);
+ }
}
+ };
+ cache.getListenable().addListener(listener);
+ try {
+ ConfigurationsUtils.updateConfigsFromZookeeper(configurations, client);
+ } catch (Exception e) {
+ LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
}
- };
- cache.getListenable().addListener(listener);
- try {
- ConfigurationsUtils.updateConfigsFromZookeeper(configurations, client);
- }
- catch(Exception e) {
- LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
}
cache.start();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java
deleted file mode 100644
index ada3854..0000000
--- a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.bolt;
-
-import org.apache.curator.test.TestingServer;
-import org.apache.metron.Constants;
-import org.apache.metron.utils.ConfigurationsUtils;
-import org.junit.Before;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-public abstract class BaseBoltTest {
-
- public String sampleConfigRoot = "../Metron-Testing/src/main/resources/sample/config/";
- protected String zookeeperUrl;
- protected Set<String> allConfigurationTypes = new HashSet<>();
-
- @Before
- public void setupConfiguration() throws Exception {
- TestingServer testZkServer = new TestingServer(true);
- this.zookeeperUrl = testZkServer.getConnectString();
- byte[] globalConfig = ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigRoot);
- ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, zookeeperUrl);
- allConfigurationTypes.add(Constants.GLOBAL_CONFIG_NAME);
- Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
- for (String sensorType : sensorEnrichmentConfigs.keySet()) {
- ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
- allConfigurationTypes.add(sensorType);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BulkMessageWriterBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BulkMessageWriterBoltTest.java
new file mode 100644
index 0000000..a66acf2
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BulkMessageWriterBoltTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.writer.interfaces.BulkMessageWriter;
+import org.hamcrest.Description;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
+
+ protected class MessageListMatcher extends ArgumentMatcher<List<JSONObject>> {
+
+ private List<JSONObject> expectedMessageList;
+
+ public MessageListMatcher(List<JSONObject> expectedMessageList) {
+ this.expectedMessageList = expectedMessageList;
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ List<JSONObject> actualMessageList = (List<JSONObject>) o;
+ for(JSONObject message: actualMessageList) removeTimingFields(message);
+ return expectedMessageList.equals(actualMessageList);
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(String.format("[%s]", expectedMessageList));
+ }
+
+ }
+
+ /**
+ * {
+ * "field": "value",
+ * "source.type": "yaf"
+ * }
+ */
+ @Multiline
+ private String sampleMessageString;
+
+ private JSONObject sampleMessage;
+ private List<JSONObject> messageList;
+ private List<Tuple> tupleList;
+
+ @Before
+ public void parseMessages() throws ParseException {
+ JSONParser parser = new JSONParser();
+ sampleMessage = (JSONObject) parser.parse(sampleMessageString);
+ sampleMessage.put("field", "value1");
+ messageList = new ArrayList<>();
+ messageList.add(((JSONObject) sampleMessage.clone()));
+ sampleMessage.put("field", "value2");
+ messageList.add(((JSONObject) sampleMessage.clone()));
+ sampleMessage.put("field", "value3");
+ messageList.add(((JSONObject) sampleMessage.clone()));
+ sampleMessage.put("field", "value4");
+ messageList.add(((JSONObject) sampleMessage.clone()));
+ sampleMessage.put("field", "value5");
+ messageList.add(((JSONObject) sampleMessage.clone()));
+ }
+
+ @Mock
+ private BulkMessageWriter<JSONObject> bulkMessageWriter;
+
+ @Test
+ public void test() throws Exception {
+ BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl").withBulkMessageWriter(bulkMessageWriter);
+ bulkMessageWriterBolt.setCuratorFramework(client);
+ bulkMessageWriterBolt.setTreeCache(cache);
+ bulkMessageWriterBolt.configurations.updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+ bulkMessageWriterBolt.declareOutputFields(declarer);
+ verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message")));
+ Map stormConf = new HashMap();
+ doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf), any(Configurations.class));
+ try {
+ bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
+ fail("A runtime exception should be thrown when bulkMessageWriter.init throws an exception");
+ } catch(RuntimeException e) {}
+ reset(bulkMessageWriter);
+ bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
+ verify(bulkMessageWriter, times(1)).init(eq(stormConf), any(Configurations.class));
+ tupleList = new ArrayList<>();
+ for(int i = 0; i < 4; i++) {
+ when(tuple.getValueByField("message")).thenReturn(messageList.get(i));
+ tupleList.add(tuple);
+ bulkMessageWriterBolt.execute(tuple);
+ verify(bulkMessageWriter, times(0)).write(eq(sensorType), any(Configurations.class), eq(tupleList), eq(messageList));
+ }
+ when(tuple.getValueByField("message")).thenReturn(messageList.get(4));
+ tupleList.add(tuple);
+ bulkMessageWriterBolt.execute(tuple);
+ verify(bulkMessageWriter, times(1)).write(eq(sensorType), any(Configurations.class), eq(tupleList), argThat(new MessageListMatcher(messageList)));
+ verify(outputCollector, times(5)).ack(tuple);
+ reset(outputCollector);
+ doThrow(new Exception()).when(bulkMessageWriter).write(eq(sensorType), any(Configurations.class), Matchers.anyListOf(Tuple.class), Matchers.anyListOf(JSONObject.class));
+ when(tuple.getValueByField("message")).thenReturn(messageList.get(0));
+ for(int i = 0; i < 5; i++) {
+ bulkMessageWriterBolt.execute(tuple);
+ }
+ verify(outputCollector, times(0)).ack(tuple);
+ verify(outputCollector, times(5)).fail(tuple);
+ verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
+ verify(outputCollector, times(1)).reportError(any(Throwable.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
index 6c538f0..9b12592 100644
--- a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
@@ -17,15 +17,15 @@
*/
package org.apache.metron.bolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
+import org.apache.curator.test.TestingServer;
import org.apache.metron.Constants;
import org.apache.metron.domain.Configurations;
import org.apache.metron.domain.SensorEnrichmentConfig;
import org.apache.metron.utils.ConfigurationsUtils;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
@@ -35,11 +35,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.mockito.Mockito.mock;
-
-public class ConfiguredBoltTest extends BaseBoltTest {
-
+public class ConfiguredBoltTest extends BaseEnrichmentBoltTest {
private static Set<String> configsUpdated = new HashSet<>();
+ private Set<String> allConfigurationTypes = new HashSet<>();
+ private String zookeeperUrl;
public static class StandAloneConfiguredBolt extends ConfiguredBolt {
@@ -61,11 +60,23 @@ public class ConfiguredBoltTest extends BaseBoltTest {
}
}
+ @Before
+ public void setupConfiguration() throws Exception {
+ TestingServer testZkServer = new TestingServer(true);
+ this.zookeeperUrl = testZkServer.getConnectString();
+ byte[] globalConfig = ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigRoot);
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, zookeeperUrl);
+ allConfigurationTypes.add(Constants.GLOBAL_CONFIG_NAME);
+ Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
+ for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+ ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
+ allConfigurationTypes.add(sensorType);
+ }
+ }
+
@Test
public void test() throws Exception {
Configurations sampleConfigurations = new Configurations();
- TopologyContext topologyContext = mock(TopologyContext.class);
- OutputCollector outputCollector = mock(OutputCollector.class);
try {
StandAloneConfiguredBolt configuredBolt = new StandAloneConfiguredBolt(null);
configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/JoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/JoinBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/JoinBoltTest.java
new file mode 100644
index 0000000..49f5c73
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/JoinBoltTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+import org.adrianwalker.multilinestring.Multiline;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class JoinBoltTest extends BaseEnrichmentBoltTest {
+
+ public class StandAloneJoinBolt extends JoinBolt<JSONObject> {
+
+ public StandAloneJoinBolt(String zookeeperUrl) {
+ super(zookeeperUrl);
+ }
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext) {
+
+ }
+
+ @Override
+ public Set<String> getStreamIds(JSONObject value) {
+ return streamIds;
+ }
+
+ @Override
+ public JSONObject joinMessages(Map<String, JSONObject> streamMessageMap) {
+ return joinedMessage;
+ }
+ }
+
+ /**
+ {
+ "joinField": "joinValue"
+ }
+ */
+ @Multiline
+ private String joinedMessageString;
+
+ private JSONObject joinedMessage;
+
+ @Before
+ public void parseMessages() {
+ JSONParser parser = new JSONParser();
+ try {
+ joinedMessage = (JSONObject) parser.parse(joinedMessageString);
+ } catch (ParseException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void test() {
+ StandAloneJoinBolt joinBolt = new StandAloneJoinBolt("zookeeperUrl");
+ joinBolt.setCuratorFramework(client);
+ joinBolt.setTreeCache(cache);
+ try {
+ joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ fail("Should fail if a maxCacheSize property is not set");
+ } catch(IllegalStateException e) {}
+ joinBolt.withMaxCacheSize(100);
+ try {
+ joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ fail("Should fail if a maxTimeRetain property is not set");
+ } catch(IllegalStateException e) {}
+ joinBolt.withMaxTimeRetain(10000);
+ joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ joinBolt.declareOutputFields(declarer);
+ verify(declarer, times(1)).declareStream(eq("message"), argThat(new FieldsMatcher("key", "message")));
+ when(tuple.getValueByField("key")).thenReturn(key);
+ when(tuple.getSourceStreamId()).thenReturn("geo");
+ when(tuple.getValueByField("message")).thenReturn(geoMessage);
+ joinBolt.execute(tuple);
+ verify(outputCollector, times(0)).emit(eq("message"), any(tuple.getClass()), any(Values.class));
+ verify(outputCollector, times(0)).ack(tuple);
+ when(tuple.getSourceStreamId()).thenReturn("host");
+ when(tuple.getValueByField("message")).thenReturn(hostMessage);
+ joinBolt.execute(tuple);
+ verify(outputCollector, times(0)).emit(eq("message"), any(tuple.getClass()), any(Values.class));
+ verify(outputCollector, times(0)).ack(tuple);
+ when(tuple.getSourceStreamId()).thenReturn("hbaseEnrichment");
+ when(tuple.getValueByField("message")).thenReturn(hbaseEnrichmentMessage);
+ joinBolt.execute(tuple);
+ verify(outputCollector, times(1)).emit(eq("message"), any(tuple.getClass()), eq(new Values(key, joinedMessage)));
+ verify(outputCollector, times(1)).ack(tuple);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/SplitBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/SplitBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/SplitBoltTest.java
new file mode 100644
index 0000000..1091ffd
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/SplitBoltTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import junit.framework.Assert;
+import org.apache.metron.domain.Configurations;
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class SplitBoltTest extends BaseEnrichmentBoltTest {
+
+ public class StandAloneSplitBolt extends SplitBolt<JSONObject> {
+
+ public StandAloneSplitBolt(String zookeeperUrl) {
+ super(zookeeperUrl);
+ }
+
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext) {
+
+ }
+
+ @Override
+ public Set<String> getStreamIds() {
+ return streamIds;
+ }
+
+ @Override
+ public String getKey(Tuple tuple, JSONObject message) {
+ return key;
+ }
+
+ @Override
+ public JSONObject generateMessage(Tuple tuple) {
+ return sampleMessage;
+ }
+
+ @Override
+ public Map<String, JSONObject> splitMessage(JSONObject message) {
+ return null;
+ }
+
+ @Override
+ public void declareOther(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public void emitOther(Tuple tuple, JSONObject message) {
+
+ }
+ }
+
+ @Test
+ public void test() {
+ StandAloneSplitBolt splitBolt = spy(new StandAloneSplitBolt("zookeeperUrl"));
+ splitBolt.setCuratorFramework(client);
+ splitBolt.setTreeCache(cache);
+ doCallRealMethod().when(splitBolt).reloadCallback(anyString(), any(Configurations.Type.class));
+ splitBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ splitBolt.declareOutputFields(declarer);
+ verify(declarer, times(1)).declareStream(eq("message"), argThat(new FieldsMatcher("key", "message")));
+ for(String streamId: streamIds) {
+ verify(declarer, times(1)).declareStream(eq(streamId), argThat(new FieldsMatcher("key", "message")));
+ }
+ verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message")));
+
+ JSONObject sampleMessage = splitBolt.generateMessage(tuple);
+ Map<String, JSONObject> streamMessageMap = new HashMap<>();
+ streamMessageMap.put("geo", geoMessage);
+ streamMessageMap.put("host", hostMessage);
+ streamMessageMap.put("hbaseEnrichment", hbaseEnrichmentMessage);
+ doReturn(streamMessageMap).when(splitBolt).splitMessage(sampleMessage);
+ splitBolt.execute(tuple);
+ verify(outputCollector, times(1)).emit(eq("message"), any(tuple.getClass()), eq(new Values(key, sampleMessage)));
+ verify(outputCollector, times(1)).emit(eq("geo"), eq(new Values(key, geoMessage)));
+ verify(outputCollector, times(1)).emit(eq("host"), eq(new Values(key, hostMessage)));
+ verify(outputCollector, times(1)).emit(eq("hbaseEnrichment"), eq(new Values(key, hbaseEnrichmentMessage)));
+ verify(outputCollector, times(1)).ack(tuple);
+ streamMessageMap = new HashMap<>();
+ streamMessageMap.put("host", null);
+ doReturn(streamMessageMap).when(splitBolt).splitMessage(sampleMessage);
+ try {
+ splitBolt.execute(tuple);
+ Assert.fail("An exception should be thrown when splitMessage produces a null value for a stream");
+ }catch (IllegalArgumentException e) {}
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
index 3214f0e..65bef92 100644
--- a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
@@ -62,7 +62,6 @@ public class ConfigurationsUtilsTest {
ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(testSensorType, testSensorConfigBytes, zookeeperUrl);
byte[] readSensorConfigBytes = ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(testSensorType, client);
Assert.assertTrue(Arrays.equals(testSensorConfigBytes, readSensorConfigBytes));
-
String name = "testConfig";
Map<String, Object> testConfig = new HashMap<>();
testConfig.put("stringField", "value");
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index 9b47d71..ee79a70 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -37,17 +37,10 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
protected static final Logger LOG = LoggerFactory
.getLogger(EnrichmentJoinBolt.class);
- private List<Enrichment> enrichments;
-
public EnrichmentJoinBolt(String zookeeperUrl) {
super(zookeeperUrl);
}
- public EnrichmentJoinBolt withEnrichments(List<Enrichment> enrichments) {
- this.enrichments = enrichments;
- return this;
- }
-
@Override
public void prepare(Map map, TopologyContext topologyContext) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
new file mode 100644
index 0000000..ff69b58
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.bolt.BaseEnrichmentBoltTest;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnrichmentJoinBoltTest extends BaseEnrichmentBoltTest {
+
+ /**
+ * {
+ * "enrichedField": "enrichedValue",
+ * "emptyEnrichedField": ""
+ * }
+ */
+ @Multiline
+ private String enrichedMessageString;
+
+ /**
+ * {
+ * "ip_src_addr": "ip1",
+ * "ip_dst_addr": "ip2",
+ * "source.type": "yaf",
+ * "enrichedField": "enrichedValue"
+ * }
+ */
+ @Multiline
+ private String expectedJoinedMessageString;
+
+ private JSONObject enrichedMessage;
+ private JSONObject expectedJoinedMessage;
+
+ @Before
+ public void parseMessages() throws ParseException {
+ JSONParser parser = new JSONParser();
+ enrichedMessage = (JSONObject) parser.parse(enrichedMessageString);
+ expectedJoinedMessage = (JSONObject) parser.parse(expectedJoinedMessageString);
+ }
+
+ @Test
+ public void test() throws IOException {
+ EnrichmentJoinBolt enrichmentJoinBolt = new EnrichmentJoinBolt("zookeeperUrl");
+ enrichmentJoinBolt.setCuratorFramework(client);
+ enrichmentJoinBolt.setTreeCache(cache);
+ enrichmentJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+ enrichmentJoinBolt.withMaxCacheSize(100);
+ enrichmentJoinBolt.withMaxTimeRetain(10000);
+ enrichmentJoinBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+ Set<String> actualStreamIds = enrichmentJoinBolt.getStreamIds(sampleMessage);
+ streamIds.add("message");
+ Assert.assertEquals(streamIds, actualStreamIds);
+ Map<String, JSONObject> streamMessageMap = new HashMap<>();
+ streamMessageMap.put("message", sampleMessage);
+ streamMessageMap.put("enriched", enrichedMessage);
+ JSONObject joinedMessage = enrichmentJoinBolt.joinMessages(streamMessageMap);
+ removeTimingFields(joinedMessage);
+ Assert.assertEquals(expectedJoinedMessage, joinedMessage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
new file mode 100644
index 0000000..40ae25d
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.metron.bolt.BaseEnrichmentBoltTest;
+import org.apache.metron.domain.Enrichment;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.when;
+
+
+public class EnrichmentSplitterBoltTest extends BaseEnrichmentBoltTest {
+
+ @Test
+ public void test() throws ParseException, IOException {
+ final Enrichment geo = new Enrichment();
+ geo.setType("geo");
+ final Enrichment host = new Enrichment();
+ host.setType("host");
+ final Enrichment hbaseEnrichment = new Enrichment();
+ hbaseEnrichment.setType("hbaseEnrichment");
+ List<Enrichment> enrichments = new ArrayList<Enrichment>() {{
+ add(geo);
+ add(host);
+ add(hbaseEnrichment);
+ }};
+
+ EnrichmentSplitterBolt enrichmentSplitterBolt = new EnrichmentSplitterBolt("zookeeperUrl").withEnrichments(enrichments);
+ enrichmentSplitterBolt.setCuratorFramework(client);
+ enrichmentSplitterBolt.setTreeCache(cache);
+ enrichmentSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+ enrichmentSplitterBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+
+ String key = enrichmentSplitterBolt.getKey(tuple, sampleMessage);
+ Assert.assertTrue(key != null && key.length() == 36);
+ String someKey = "someKey";
+ when(tuple.getStringByField("key")).thenReturn(someKey);
+ key = enrichmentSplitterBolt.getKey(tuple, sampleMessage);
+ Assert.assertEquals(someKey, key);
+ when(tuple.getBinary(0)).thenReturn(sampleMessageString.getBytes());
+ JSONObject generatedMessage = enrichmentSplitterBolt.generateMessage(tuple);
+ removeTimingFields(generatedMessage);
+ Assert.assertEquals(sampleMessage, generatedMessage);
+ String messageFieldName = "messageFieldName";
+ enrichmentSplitterBolt.withMessageFieldName(messageFieldName);
+ when(tuple.getValueByField(messageFieldName)).thenReturn(sampleMessage);
+ generatedMessage = enrichmentSplitterBolt.generateMessage(tuple);
+ Assert.assertEquals(sampleMessage, generatedMessage);
+ Set<String> actualStreamIds = enrichmentSplitterBolt.getStreamIds();
+ Assert.assertEquals(streamIds, actualStreamIds);
+
+ Map<String, JSONObject> actualSplitMessages = enrichmentSplitterBolt.splitMessage(sampleMessage);
+ Assert.assertEquals(3, actualSplitMessages.size());
+ Assert.assertEquals(geoMessage, actualSplitMessages.get("geo"));
+ Assert.assertEquals(hostMessage, actualSplitMessages.get("host"));
+ Assert.assertEquals(hbaseEnrichmentMessage, actualSplitMessages.get("hbaseEnrichment"));
+
+
+ }
+
+ public void removeTimingFields(JSONObject message) {
+ ImmutableSet keys = ImmutableSet.copyOf(message.keySet());
+ for(Object key: keys) {
+ if (key.toString().contains("splitter.begin.ts")) {
+ message.remove(key);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
new file mode 100644
index 0000000..f195f5e
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import backtype.storm.tuple.Values;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.bolt.BaseEnrichmentBoltTest;
+import org.apache.metron.domain.Enrichment;
+import org.apache.metron.domain.SensorEnrichmentConfig;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.utils.ConfigurationsUtils;
+import org.hamcrest.Description;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
+
+ protected class EnrichedMessageMatcher extends ArgumentMatcher<Values> {
+
+ private String expectedKey;
+ private JSONObject expectedMessage;
+
+ public EnrichedMessageMatcher(String expectedKey, JSONObject expectedMessage) {
+ this.expectedKey = expectedKey;
+ this.expectedMessage = expectedMessage;
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ Values values = (Values) o;
+ String actualKey = (String) values.get(0);
+ JSONObject actualMessage = (JSONObject) values.get(1);
+ removeTimingFields(actualMessage);
+ return expectedKey.equals(actualKey) && expectedMessage.equals(actualMessage);
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(String.format("[%s]", expectedMessage));
+ }
+
+ }
+
+ /**
+ {
+ "field1": "value1",
+ "field2": "value2",
+ "source.type": "yaf"
+ }
+ */
+ @Multiline
+ private String originalMessageString;
+
+ /**
+ {
+ "enrichedField1": "enrichedValue1"
+ }
+ */
+ @Multiline
+ private String enrichedField1String;
+
+ /**
+ {
+ "enrichedField2": "enrichedValue2"
+ }
+ */
+ @Multiline
+ private String enrichedField2String;
+
+ /**
+ {
+ "field1.enrichedField1": "enrichedValue1",
+ "field2.enrichedField2": "enrichedValue2",
+ "source.type": "yaf"
+ }
+ */
+ @Multiline
+ private String enrichedMessageString;
+
+ private JSONObject originalMessage;
+ private JSONObject enrichedField1;
+ private JSONObject enrichedField2;
+ private JSONObject enrichedMessage;
+
+ @Before
+ public void parseMessages() throws ParseException {
+ JSONParser parser = new JSONParser();
+ originalMessage = (JSONObject) parser.parse(originalMessageString);
+ enrichedField1 = (JSONObject) parser.parse(enrichedField1String);
+ enrichedField2 = (JSONObject) parser.parse(enrichedField2String);
+ enrichedMessage = (JSONObject) parser.parse(enrichedMessageString);
+ }
+
+ @Mock
+ public EnrichmentAdapter<CacheKey> enrichmentAdapter;
+
+ @Before
+ public void initMocks() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void test() throws IOException {
+ String key = "someKey";
+ String enrichmentType = "enrichmentType";
+ Enrichment<EnrichmentAdapter<CacheKey>> testEnrichment = new Enrichment<>();
+ testEnrichment.setType(enrichmentType);
+ testEnrichment.setAdapter(enrichmentAdapter);
+ GenericEnrichmentBolt genericEnrichmentBolt = new GenericEnrichmentBolt("zookeeperUrl");
+ genericEnrichmentBolt.setCuratorFramework(client);
+ genericEnrichmentBolt.setTreeCache(cache);
+ genericEnrichmentBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+ try {
+ genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ fail("Should fail if a maxCacheSize property is not set");
+ } catch(IllegalStateException e) {}
+ genericEnrichmentBolt.withMaxCacheSize(100);
+ try {
+ genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ fail("Should fail if a maxTimeRetain property is not set");
+ } catch(IllegalStateException e) {}
+ genericEnrichmentBolt.withMaxTimeRetain(10000);
+ try {
+ genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ fail("Should fail if an adapter is not set");
+ } catch(IllegalStateException e) {}
+ genericEnrichmentBolt.withEnrichment(testEnrichment);
+ when(enrichmentAdapter.initializeAdapter()).thenReturn(true);
+ genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ verify(enrichmentAdapter, times(1)).initializeAdapter();
+ when(enrichmentAdapter.initializeAdapter()).thenReturn(false);
+ try {
+ genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ fail("An exception should be thrown if enrichment adapter initialization fails");
+ } catch(IllegalStateException e) {}
+ genericEnrichmentBolt.declareOutputFields(declarer);
+ verify(declarer, times(1)).declareStream(eq(enrichmentType), argThat(new FieldsMatcher("key", "message")));
+ verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message")));
+ when(tuple.getStringByField("key")).thenReturn(null);
+ genericEnrichmentBolt.execute(tuple);
+ verify(outputCollector, times(1)).emit(eq("error"), any(Values.class));
+ when(tuple.getStringByField("key")).thenReturn(key);
+ when(tuple.getValueByField("message")).thenReturn(originalMessage);
+ genericEnrichmentBolt.execute(tuple);
+ verify(outputCollector, times(1)).emit(eq(enrichmentType), argThat(new EnrichedMessageMatcher(key, new JSONObject())));
+ reset(enrichmentAdapter);
+
+ SensorEnrichmentConfig sensorEnrichmentConfig = SensorEnrichmentConfig.
+ fromBytes(ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot).get(sensorType));
+ CacheKey cacheKey1 = new CacheKey("field1", "value1", sensorEnrichmentConfig);
+ CacheKey cacheKey2 = new CacheKey("field2", "value2", sensorEnrichmentConfig);
+ when(enrichmentAdapter.enrich(cacheKey1)).thenReturn(enrichedField1);
+ when(enrichmentAdapter.enrich(cacheKey2)).thenReturn(enrichedField2);
+ genericEnrichmentBolt.execute(tuple);
+ verify(enrichmentAdapter, times(1)).logAccess(cacheKey1);
+ verify(enrichmentAdapter, times(1)).logAccess(cacheKey2);
+ verify(outputCollector, times(1)).emit(eq(enrichmentType), argThat(new EnrichedMessageMatcher(key, enrichedMessage)));
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
new file mode 100644
index 0000000..a065d2d
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import junit.framework.Assert;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.bolt.BaseEnrichmentBoltTest;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ThreatIntelJoinBoltTest extends BaseEnrichmentBoltTest {
+
+ /**
+ {
+ "field1": "value1",
+ "enrichedField1": "enrichedValue1",
+ "source.type": "yaf"
+ }
+ */
+ @Multiline
+ private String messageString;
+
+ /**
+ {
+ "field1": "value1",
+ "enrichedField1": "enrichedValue1",
+ "source.type": "yaf",
+ "threatintels.field.end.ts": "timing"
+ }
+ */
+ @Multiline
+ private String messageWithTimingString;
+
+ /**
+ {
+ "field1": "value1",
+ "enrichedField1": "enrichedValue1",
+ "source.type": "yaf",
+ "threatintels.field": "threatIntelValue"
+ }
+ */
+ @Multiline
+ private String alertMessageString;
+
+ private JSONObject message;
+ private JSONObject messageWithTiming;
+ private JSONObject alertMessage;
+
+ @Before
+ public void parseMessages() throws ParseException {
+ JSONParser parser = new JSONParser();
+ message = (JSONObject) parser.parse(messageString);
+ messageWithTiming = (JSONObject) parser.parse(messageWithTimingString);
+ alertMessage = (JSONObject) parser.parse(alertMessageString);
+ }
+
+ @Test
+ public void test() throws IOException {
+ ThreatIntelJoinBolt threatIntelJoinBolt = new ThreatIntelJoinBolt("zookeeperUrl");
+ threatIntelJoinBolt.setCuratorFramework(client);
+ threatIntelJoinBolt.setTreeCache(cache);
+ threatIntelJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+ threatIntelJoinBolt.withMaxCacheSize(100);
+ threatIntelJoinBolt.withMaxTimeRetain(10000);
+ threatIntelJoinBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+ Map<String, List<String>> fieldMap = threatIntelJoinBolt.getFieldMap("incorrectSourceType");
+ Assert.assertNull(fieldMap);
+ fieldMap = threatIntelJoinBolt.getFieldMap(sensorType);
+ Assert.assertTrue(fieldMap.containsKey("hbaseThreatIntel"));
+ Map<String, JSONObject> streamMessageMap = new HashMap<>();
+ streamMessageMap.put("message", message);
+ JSONObject joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap);
+ Assert.assertFalse(joinedMessage.containsKey("is_alert"));
+ streamMessageMap.put("message", messageWithTiming);
+ joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap);
+ Assert.assertFalse(joinedMessage.containsKey("is_alert"));
+ streamMessageMap.put("message", alertMessage);
+ joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap);
+ Assert.assertTrue(joinedMessage.containsKey("is_alert") && "true".equals(joinedMessage.get("is_alert")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
new file mode 100644
index 0000000..1c16f83
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import junit.framework.Assert;
+import org.apache.metron.bolt.BaseEnrichmentBoltTest;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ThreatIntelSplitterBoltTest extends BaseEnrichmentBoltTest {
+
+ @Test
+ public void test() throws IOException {
+ String threatIntelType = "hbaseThreatIntel";
+ ThreatIntelSplitterBolt threatIntelSplitterBolt = new ThreatIntelSplitterBolt("zookeeperUrl");
+ threatIntelSplitterBolt.setCuratorFramework(client);
+ threatIntelSplitterBolt.setTreeCache(cache);
+ threatIntelSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+ threatIntelSplitterBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+ Map<String, List<String>> fieldMap = threatIntelSplitterBolt.getFieldMap(sensorType);
+ Assert.assertTrue(fieldMap.containsKey(threatIntelType));
+ String fieldName = threatIntelSplitterBolt.getKeyName(threatIntelType, "field");
+ Assert.assertEquals("threatintels.hbaseThreatIntel.field", fieldName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-MessageParsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/pom.xml b/metron-streaming/Metron-MessageParsers/pom.xml
index 2687213..108447a 100644
--- a/metron-streaming/Metron-MessageParsers/pom.xml
+++ b/metron-streaming/Metron-MessageParsers/pom.xml
@@ -93,6 +93,12 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-TestingUtilities</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<reporting>
<plugins>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/bolt/ParserBoltTest.java b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/bolt/ParserBoltTest.java
new file mode 100644
index 0000000..856bc1f
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/bolt/ParserBoltTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.parser.interfaces.MessageFilter;
+import org.apache.metron.parser.interfaces.MessageParser;
+import org.apache.metron.writer.interfaces.MessageWriter;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ParserBoltTest extends BaseBoltTest {
+
+ @Mock
+ private MessageParser<JSONObject> parser;
+
+ @Mock
+ private MessageWriter<JSONObject> writer;
+
+ @Mock
+ private MessageFilter<JSONObject> filter;
+
+ @Test
+ public void test() throws Exception {
+ String sensorType = "yaf";
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, writer);
+ parserBolt.setCuratorFramework(client);
+ parserBolt.setTreeCache(cache);
+ parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ verify(parser, times(1)).init();
+ verify(writer, times(1)).init();
+ byte[] sampleBinary = "some binary message".getBytes();
+ JSONParser jsonParser = new JSONParser();
+ final JSONObject sampleMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\" }");
+ final JSONObject sampleMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\" }");
+ List<JSONObject> messages = new ArrayList<JSONObject>() {{
+ add(sampleMessage1);
+ add(sampleMessage2);
+ }};
+ final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\" }");
+ final JSONObject finalMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\", \"source.type\":\"" + sensorType + "\" }");
+ when(tuple.getBinary(0)).thenReturn(sampleBinary);
+ when(parser.parse(sampleBinary)).thenReturn(messages);
+ when(parser.validate(eq(messages.get(0)))).thenReturn(true);
+ when(parser.validate(eq(messages.get(1)))).thenReturn(false);
+ parserBolt.execute(tuple);
+ verify(writer, times(1)).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage1));
+ verify(outputCollector, times(1)).ack(tuple);
+ when(parser.validate(eq(messages.get(0)))).thenReturn(true);
+ when(parser.validate(eq(messages.get(1)))).thenReturn(true);
+ when(filter.emitTuple(messages.get(0))).thenReturn(false);
+ when(filter.emitTuple(messages.get(1))).thenReturn(true);
+ parserBolt.withMessageFilter(filter);
+ parserBolt.execute(tuple);
+ verify(writer, times(1)).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage2));
+ verify(outputCollector, times(2)).ack(tuple);
+ doThrow(new Exception()).when(writer).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage2));
+ parserBolt.execute(tuple);
+ verify(outputCollector, times(1)).reportError(any(Throwable.class));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-TestingUtilities/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-TestingUtilities/pom.xml b/metron-streaming/Metron-TestingUtilities/pom.xml
index 1c67e7c..d19c0b2 100644
--- a/metron-streaming/Metron-TestingUtilities/pom.xml
+++ b/metron-streaming/Metron-TestingUtilities/pom.xml
@@ -35,8 +35,11 @@
<artifactId>junit</artifactId>
<version>${global_junit_version}</version>
</dependency>
-
-
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>${global_json_simple_version}</version>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@@ -64,6 +67,47 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>2.7.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>2.7.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${global_mockito_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>nl.jqno.equalsverifier</groupId>
+ <artifactId>equalsverifier</artifactId>
+ <version>2.0.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.adrianwalker</groupId>
+ <artifactId>multiline-string</artifactId>
+ <version>0.1.2</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseBoltTest.java b/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseBoltTest.java
new file mode 100644
index 0000000..cb928ff
--- /dev/null
+++ b/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseBoltTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.hamcrest.Description;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Arrays;
+import java.util.List;
+
+public abstract class BaseBoltTest {
+
+ @Mock
+ protected TopologyContext topologyContext;
+
+ @Mock
+ protected OutputCollector outputCollector;
+
+ @Mock
+ protected Tuple tuple;
+
+ @Mock
+ protected OutputFieldsDeclarer declarer;
+
+ @Mock
+ protected CuratorFramework client;
+
+ @Mock
+ protected TreeCache cache;
+
+ @Before
+ public void initMocks() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ protected class FieldsMatcher extends ArgumentMatcher<Fields> {
+
+ private List<String> expectedFields;
+
+ public FieldsMatcher(String... fields) {
+ this.expectedFields = Arrays.asList(fields);
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ Fields fields = (Fields) o;
+ return expectedFields.equals(fields.toList());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(String.format("[%s]", Joiner.on(",").join(expectedFields)));
+ }
+
+ }
+
+ public void removeTimingFields(JSONObject message) {
+ ImmutableSet keys = ImmutableSet.copyOf(message.keySet());
+ for (Object key : keys) {
+ if (key.toString().endsWith(".ts")) {
+ message.remove(key);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseEnrichmentBoltTest.java b/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseEnrichmentBoltTest.java
new file mode 100644
index 0000000..9ab9b23
--- /dev/null
+++ b/metron-streaming/Metron-TestingUtilities/src/main/java/org/apache/metron/bolt/BaseEnrichmentBoltTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class BaseEnrichmentBoltTest extends BaseBoltTest {
+
+ public String sampleConfigRoot = "../Metron-Testing/src/main/resources/sample/config/";
+ public String sampleSensorEnrichmentConfigPath = sampleConfigRoot + "sensors/yaf.json";
+ protected Set<String> streamIds = new HashSet<>();
+ protected String key = "someKey";
+ protected String sensorType = "yaf";
+
+ /**
+ * {
+ * "ip_src_addr": "ip1",
+ * "ip_dst_addr": "ip2",
+ * "source.type": "yaf"
+ * }
+ */
+ @Multiline
+ protected String sampleMessageString;
+
+ /**
+ * {
+ * "enrichments.geo.ip_src_addr": "ip1",
+ * "enrichments.geo.ip_dst_addr": "ip2",
+ * "source.type": "yaf"
+ * }
+ */
+ @Multiline
+ protected String geoMessageString;
+
+ /**
+ * {
+ * "enrichments.host.ip_src_addr": "ip1",
+ * "enrichments.host.ip_dst_addr": "ip2",
+ * "source.type": "yaf"
+ * }
+ */
+ @Multiline
+ protected String hostMessageString;
+
+ /**
+ * {
+ * "enrichments.hbaseEnrichment.ip_src_addr": "ip1",
+ * "enrichments.hbaseEnrichment.ip_dst_addr": "ip2",
+ * "source.type": "yaf"
+ * }
+ */
+ @Multiline
+ protected String hbaseEnrichmentMessageString;
+
+ protected JSONObject sampleMessage;
+ protected JSONObject geoMessage;
+ protected JSONObject hostMessage;
+ protected JSONObject hbaseEnrichmentMessage;
+
+ @Before
+ public void parseBaseMessages() throws ParseException {
+ JSONParser parser = new JSONParser();
+ sampleMessage = (JSONObject) parser.parse(sampleMessageString);
+ geoMessage = (JSONObject) parser.parse(geoMessageString);
+ hostMessage = (JSONObject) parser.parse(hostMessageString);
+ hbaseEnrichmentMessage = (JSONObject) parser.parse(hbaseEnrichmentMessageString);
+ streamIds.add("geo");
+ streamIds.add("host");
+ streamIds.add("hbaseEnrichment");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
index ddf0d9e..ef2143d 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
@@ -239,9 +239,6 @@ bolts:
constructorArgs:
- "${kafka.zk}"
configMethods:
- - name: "withEnrichments"
- args:
- - ref: "enrichments"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
@@ -275,9 +272,6 @@ bolts:
constructorArgs:
- "${kafka.zk}"
configMethods:
- - name: "withEnrichments"
- args:
- - ref: "threatIntels"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/25e73266/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
index 60cce7d..c39d3e7 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
@@ -221,9 +221,6 @@ bolts:
constructorArgs:
- "${kafka.zk}"
configMethods:
- - name: "withEnrichments"
- args:
- - ref: "enrichments"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"
@@ -257,9 +254,6 @@ bolts:
constructorArgs:
- "${kafka.zk}"
configMethods:
- - name: "withEnrichments"
- args:
- - ref: "threatIntels"
- name: "withMaxCacheSize"
args: [10000]
- name: "withMaxTimeRetain"