You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2016/04/12 02:05:25 UTC
incubator-metron git commit: METRON-101 Unit Tests for
configuration-related classes (merrimanr via nickwallen) closes
apache/incubator-metron#71
Repository: incubator-metron
Updated Branches:
refs/heads/master 9ec1ef20b -> 8d5fb1bbc
METRON-101 Unit Tests for configuration-related classes (merrimanr via nickwallen) closes apache/incubator-metron#71
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/8d5fb1bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/8d5fb1bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/8d5fb1bb
Branch: refs/heads/master
Commit: 8d5fb1bbcd05a66c6c33475341b5216459b90a73
Parents: 9ec1ef2
Author: merrimanr <me...@gmail.com>
Authored: Mon Apr 11 20:00:15 2016 -0400
Committer: Nick Allen <ni...@nickallen.org>
Committed: Mon Apr 11 20:00:15 2016 -0400
----------------------------------------------------------------------
metron-streaming/Metron-Common/pom.xml | 24 ++-
.../org/apache/metron/bolt/ConfiguredBolt.java | 24 ++-
.../apache/metron/domain/Configurations.java | 36 ++++-
.../metron/domain/SensorEnrichmentConfig.java | 13 +-
.../metron/utils/ConfigurationsUtils.java | 59 +++++---
.../org/apache/metron/bolt/BaseBoltTest.java | 48 ++++++
.../apache/metron/bolt/ConfiguredBoltTest.java | 149 +++++++++++++++++++
.../metron/domain/ConfigurationsTest.java | 39 +++++
.../domain/SensorEnrichmentConfigTest.java | 42 ++++++
.../metron/utils/ConfigurationsUtilsTest.java | 95 ++++++++++++
.../enrichment/bolt/GenericEnrichmentBolt.java | 3 +-
.../SolrEnrichmentIntegrationTest.java | 2 +-
12 files changed, 484 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
index 22460d0..80b6a17 100644
--- a/metron-streaming/Metron-Common/pom.xml
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -63,12 +63,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- <version>${global_slf4j_version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>${global_kafka_version}</version>
@@ -182,6 +176,24 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>2.7.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${global_mockito_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>nl.jqno.equalsverifier</groupId>
+ <artifactId>equalsverifier</artifactId>
+ <version>2.0.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<reporting>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
index 3d22fe9..40a1f08 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
@@ -40,7 +40,6 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
private static final Logger LOG = Logger.getLogger(ConfiguredBolt.class);
private String zookeeperUrl;
- private long timeout = Constants.DEFAULT_CONFIGURED_BOLT_TIMEOUT;
protected final Configurations configurations = new Configurations();
private CuratorFramework client;
@@ -50,21 +49,15 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
this.zookeeperUrl = zookeeperUrl;
}
- protected void reloadCallback() {
- }
-
- public ConfiguredBolt withTimeout(long timeout) {
- this.timeout = timeout;
- return this;
+ protected void reloadCallback(String name, Configurations.Type type) {
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
- client.start();
try {
- ConfigurationsUtils.updateConfigsFromZookeeper(configurations, client);
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+ client.start();
cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
TreeCacheListener listener = new TreeCacheListener() {
@Override
@@ -73,7 +66,6 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
String path = event.getData().getPath();
byte[] data = event.getData().getData();
updateConfig(path, data);
- reloadCallback();
}
}
};
@@ -84,7 +76,6 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
catch(Exception e) {
LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
}
-
cache.start();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -93,15 +84,20 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
}
public void updateConfig(String path, byte[] data) throws IOException {
- if (data.length != 0 && path != null) {
+ if (data.length != 0) {
String name = path.substring(path.lastIndexOf("/") + 1);
+ Configurations.Type type;
if (path.startsWith(Constants.ZOOKEEPER_SENSOR_ROOT)) {
configurations.updateSensorEnrichmentConfig(name, data);
+ type = Configurations.Type.SENSOR;
} else if (Constants.ZOOKEEPER_GLOBAL_ROOT.equals(path)) {
configurations.updateGlobalConfig(data);
+ type = Configurations.Type.GLOBAL;
} else {
configurations.updateConfig(name, data);
+ type = Configurations.Type.OTHER;
}
+ reloadCallback(name, type);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
index d93cc5f..63e0f95 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
@@ -33,6 +33,10 @@ public class Configurations implements Serializable {
private static final Logger LOG = Logger.getLogger(Configurations.class);
+ public enum Type {
+ GLOBAL, SENSOR, OTHER
+ }
+
public static final String GLOBAL_CONFIG_NAME = "global";
private ConcurrentMap<String, Object> configurations = new ConcurrentHashMap<>();
@@ -47,7 +51,8 @@ public class Configurations implements Serializable {
}
public void updateGlobalConfig(InputStream io) throws IOException {
- Map<String, Object> globalConfig = JSONUtils.INSTANCE.load(io, new TypeReference<Map<String, Object>>() {});
+ Map<String, Object> globalConfig = JSONUtils.INSTANCE.load(io, new TypeReference<Map<String, Object>>() {
+ });
updateGlobalConfig(globalConfig);
}
@@ -77,16 +82,31 @@ public class Configurations implements Serializable {
return (Map<String, Object>) configurations.get(name);
}
- public void updateConfig(String name, byte[] data) {
- try {
- Map<String, Object> config = JSONUtils.INSTANCE.load(new ByteArrayInputStream(data), new TypeReference<Map<String, Object>>(){});
- updateConfig(name, config);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
+ public void updateConfig(String name, byte[] data) throws IOException {
+ if (data == null) throw new IllegalStateException("config data cannot be null");
+ Map<String, Object> config = JSONUtils.INSTANCE.load(new ByteArrayInputStream(data), new TypeReference<Map<String, Object>>() {});
+ updateConfig(name, config);
}
public void updateConfig(String name, Map<String, Object> config) {
configurations.put(name, config);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Configurations that = (Configurations) o;
+ return configurations.equals(that.configurations);
+ }
+
+ @Override
+ public int hashCode() {
+ return configurations.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return configurations.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
index e37c6d0..ea345ca 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
@@ -83,9 +83,7 @@ public class SensorEnrichmentConfig {
public static SensorEnrichmentConfig fromBytes(byte[] config) throws IOException {
return JSONUtils.INSTANCE.load(new String(config), SensorEnrichmentConfig.class);
}
- public String toJSON(boolean pretty) throws JsonProcessingException {
- return JSONUtils.INSTANCE.toJSON(this, pretty);
- }
+
public String toJSON() throws JsonProcessingException {
return JSONUtils.INSTANCE.toJSON(this, true);
}
@@ -110,6 +108,15 @@ public class SensorEnrichmentConfig {
}
@Override
+ public String toString() {
+ return "{index=" + index + ", batchSize=" + batchSize +
+ ", enrichmentFieldMap=" + enrichmentFieldMap +
+ ", threatIntelFieldMap" + threatIntelFieldMap +
+ ", fieldToEnrichmentTypeMap=" + fieldToEnrichmentTypeMap +
+ ", fieldToThreatIntelTypeMap=" + fieldToThreatIntelTypeMap + "}";
+ }
+
+ @Override
public int hashCode() {
int result = getIndex() != null ? getIndex().hashCode() : 0;
result = 31 * result + (getEnrichmentFieldMap() != null ? getEnrichmentFieldMap().hashCode() : 0);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
index 45e93a1..62259df 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
@@ -49,38 +49,59 @@ public class ConfigurationsUtils {
return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
}
- public static void writeToZookeeperFromFile(String path, String filePath, String zookeeperUrl) throws Exception {
- writeToZookeeper(path, Files.readAllBytes(Paths.get(filePath)), zookeeperUrl);
+ public static void writeGlobalConfigToZookeeper(Map<String, Object> globalConfig, String zookeeperUrl) throws Exception {
+ writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), zookeeperUrl);
}
- public static void writeToZookeeperFromFile(String path, String filePath, CuratorFramework client) throws Exception {
- writeToZookeeper(path, Files.readAllBytes(Paths.get(filePath)), client);
+ public static void writeGlobalConfigToZookeeper(byte[] globalConfig, String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ try {
+ writeGlobalConfigToZookeeper(globalConfig, client);
+ }
+ finally {
+ client.close();
+ }
}
- public static void writerGlobalConfigToZookeeper(byte[] configData, String zookeeperUrl) throws Exception {
- writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, configData, zookeeperUrl);
+
+ public static void writeGlobalConfigToZookeeper(byte[] globalConfig, CuratorFramework client) throws Exception {
+ writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, globalConfig, client);
}
- public static void writerGlobalConfigToZookeeper(byte[] configData, CuratorFramework client) throws Exception {
- writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, configData, client);
+ public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig, String zookeeperUrl) throws Exception {
+ writeSensorEnrichmentConfigToZookeeper(sensorType, JSONUtils.INSTANCE.toJSON(sensorEnrichmentConfig), zookeeperUrl);
}
+
public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
- writeToZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, configData, zookeeperUrl);
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ try {
+ writeSensorEnrichmentConfigToZookeeper(sensorType, configData, client);
+ }
+ finally {
+ client.close();
+ }
}
public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
writeToZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, configData, client);
}
- public static void writeToZookeeper(String path, byte[] configData, String zookeeperUrl) throws Exception {
+
+ public static void writeConfigToZookeeper(String name, Map<String, Object> config, String zookeeperUrl) throws Exception {
+ writeConfigToZookeeper(name, JSONUtils.INSTANCE.toJSON(config), zookeeperUrl);
+ }
+
+ public static void writeConfigToZookeeper(String name, byte[] config, String zookeeperUrl) throws Exception {
CuratorFramework client = getClient(zookeeperUrl);
client.start();
try {
- writeToZookeeper(path, configData, client);
+ writeToZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, config, client);
}
finally {
client.close();
}
-
}
+
public static void writeToZookeeper(String path, byte[] configData, CuratorFramework client) throws Exception {
try {
client.setData().forPath(path, configData);
@@ -98,19 +119,23 @@ public class ConfigurationsUtils {
}
public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
- return readConfigBytesFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
+ return readFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
}
public static byte[] readSensorEnrichmentConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
- return readConfigBytesFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
+ return readFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
+ }
+
+ public static byte[] readConfigBytesFromZookeeper(String name, CuratorFramework client) throws Exception {
+ return readFromZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, client);
}
- public static byte[] readConfigBytesFromZookeeper(String path, CuratorFramework client) throws Exception {
+ public static byte[] readFromZookeeper(String path, CuratorFramework client) throws Exception {
return client.getData().forPath(path);
}
public static void uploadConfigsToZookeeper(String rootFilePath, String zookeeperUrl) throws Exception {
- ConfigurationsUtils.writerGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), zookeeperUrl);
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), zookeeperUrl);
Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(rootFilePath);
for(String sensorType: sensorEnrichmentConfigs.keySet()) {
ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
@@ -160,7 +185,7 @@ public class ConfigurationsUtils {
options.addOption(o);
}
{
- Option o = new Option("p", "config_files", true, "Path to the source config files. Must be named like \"$source\"-config.json");
+ Option o = new Option("p", "config_files", true, "Path to the source config files. Must be named like \"$source\".json");
o.setArgName("DIR_NAME");
o.setRequired(false);
options.addOption(o);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java
new file mode 100644
index 0000000..ada3854
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/BaseBoltTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.metron.Constants;
+import org.apache.metron.utils.ConfigurationsUtils;
+import org.junit.Before;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class BaseBoltTest {
+
+ public String sampleConfigRoot = "../Metron-Testing/src/main/resources/sample/config/";
+ protected String zookeeperUrl;
+ protected Set<String> allConfigurationTypes = new HashSet<>();
+
+ @Before
+ public void setupConfiguration() throws Exception {
+ TestingServer testZkServer = new TestingServer(true);
+ this.zookeeperUrl = testZkServer.getConnectString();
+ byte[] globalConfig = ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigRoot);
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, zookeeperUrl);
+ allConfigurationTypes.add(Constants.GLOBAL_CONFIG_NAME);
+ Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
+ for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+ ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
+ allConfigurationTypes.add(sensorType);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
new file mode 100644
index 0000000..6c538f0
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/bolt/ConfiguredBoltTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.domain.SensorEnrichmentConfig;
+import org.apache.metron.utils.ConfigurationsUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+
+public class ConfiguredBoltTest extends BaseBoltTest {
+
+ private static Set<String> configsUpdated = new HashSet<>();
+
+ public static class StandAloneConfiguredBolt extends ConfiguredBolt {
+
+ public StandAloneConfiguredBolt(String zookeeperUrl) {
+ super(zookeeperUrl);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ @Override
+ protected void reloadCallback(String name, Configurations.Type type) {
+ configsUpdated.add(name);
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ Configurations sampleConfigurations = new Configurations();
+ TopologyContext topologyContext = mock(TopologyContext.class);
+ OutputCollector outputCollector = mock(OutputCollector.class);
+ try {
+ StandAloneConfiguredBolt configuredBolt = new StandAloneConfiguredBolt(null);
+ configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ Assert.fail("A valid zookeeper url must be supplied");
+ } catch (RuntimeException e){}
+
+ configsUpdated = new HashSet<>();
+ sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigRoot));
+ Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
+ for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+ sampleConfigurations.updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
+ }
+
+ StandAloneConfiguredBolt configuredBolt = new StandAloneConfiguredBolt(zookeeperUrl);
+ configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ waitForConfigUpdate(allConfigurationTypes);
+ Assert.assertEquals(sampleConfigurations, configuredBolt.configurations);
+
+ configsUpdated = new HashSet<>();
+ Map<String, Object> sampleGlobalConfig = sampleConfigurations.getGlobalConfig();
+ sampleGlobalConfig.put("newGlobalField", "newGlobalValue");
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
+ waitForConfigUpdate(Constants.GLOBAL_CONFIG_NAME);
+ Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.configurations.getGlobalConfig());
+
+ configsUpdated = new HashSet<>();
+ sampleGlobalConfig.remove("newGlobalField");
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
+ waitForConfigUpdate(Constants.GLOBAL_CONFIG_NAME);
+ Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.configurations);
+
+ configsUpdated = new HashSet<>();
+ String sensorType = "testSensorConfig";
+ SensorEnrichmentConfig testSensorConfig = new SensorEnrichmentConfig();
+ testSensorConfig.setBatchSize(50);
+ testSensorConfig.setIndex("test");
+ Map<String, List<String>> enrichmentFieldMap = new HashMap<>();
+ enrichmentFieldMap.put("enrichmentTest", new ArrayList<String>() {{
+ add("enrichmentField");
+ }});
+ testSensorConfig.setEnrichmentFieldMap(enrichmentFieldMap);
+ Map<String, List<String>> threatIntelFieldMap = new HashMap<>();
+ threatIntelFieldMap.put("threatIntelTest", new ArrayList<String>() {{
+ add("threatIntelField");
+ }});
+ testSensorConfig.setThreatIntelFieldMap(threatIntelFieldMap);
+ sampleConfigurations.updateSensorEnrichmentConfig(sensorType, testSensorConfig);
+ ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, testSensorConfig, zookeeperUrl);
+ waitForConfigUpdate(sensorType);
+ Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.configurations);
+
+ configsUpdated = new HashSet<>();
+ String someConfigType = "someConfig";
+ Map<String, Object> someConfig = new HashMap<>();
+ someConfig.put("someField", "someValue");
+ sampleConfigurations.updateConfig(someConfigType, someConfig);
+ ConfigurationsUtils.writeConfigToZookeeper(someConfigType, someConfig, zookeeperUrl);
+ waitForConfigUpdate(someConfigType);
+ Assert.assertEquals("Add new misc config", sampleConfigurations, configuredBolt.configurations);
+ configuredBolt.cleanup();
+ }
+
+ private void waitForConfigUpdate(final String expectedConfigUpdate) {
+ waitForConfigUpdate(new HashSet<String>() {{ add(expectedConfigUpdate); }});
+ }
+
+ private void waitForConfigUpdate(Set<String> expectedConfigUpdates) {
+ int count = 0;
+ while (!configsUpdated.equals(expectedConfigUpdates)) {
+ if (count++ > 5) {
+ Assert.fail("ConfiguredBolt was not updated in time");
+ return;
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationsTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationsTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationsTest.java
new file mode 100644
index 0000000..5d883b6
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationsTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import junit.framework.Assert;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import nl.jqno.equalsverifier.Warning;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class ConfigurationsTest {
+
+ @Test
+ public void test() throws IOException {
+ EqualsVerifier.forClass(Configurations.class).suppress(Warning.NONFINAL_FIELDS, Warning.NULL_FIELDS).usingGetClass().verify();
+ Configurations configurations = new Configurations();
+ try {
+ configurations.updateConfig("someConfig", (byte[]) null);
+ Assert.fail("Updating a config with null should throw an IllegalStateException");
+ } catch(IllegalStateException e) {}
+ Assert.assertTrue(configurations.toString() != null && configurations.toString().length() > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/SensorEnrichmentConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/SensorEnrichmentConfigTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/SensorEnrichmentConfigTest.java
new file mode 100644
index 0000000..ed2b854
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/SensorEnrichmentConfigTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import junit.framework.Assert;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import nl.jqno.equalsverifier.Warning;
+import org.apache.metron.utils.ConfigurationsUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class SensorEnrichmentConfigTest {
+
+ public String sampleConfigRoot = "../Metron-Testing/src/main/resources/sample/config/";
+
+ @Test
+ public void test() throws IOException {
+ EqualsVerifier.forClass(SensorEnrichmentConfig.class).suppress(Warning.NONFINAL_FIELDS).usingGetClass().verify();
+ Map<String, byte[]> testSensorConfigMap = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
+ byte[] sensorConfigBytes = testSensorConfigMap.get("yaf");
+ SensorEnrichmentConfig sensorEnrichmentConfig = SensorEnrichmentConfig.fromBytes(sensorConfigBytes);
+ Assert.assertNotNull(sensorEnrichmentConfig);
+ Assert.assertTrue(sensorEnrichmentConfig.toString() != null && sensorEnrichmentConfig.toString().length() > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
new file mode 100644
index 0000000..3214f0e
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/utils/ConfigurationsUtilsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.utils;
+
+import junit.framework.Assert;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ConfigurationsUtilsTest {
+
+ public String sampleConfigRoot = "../Metron-Testing/src/main/resources/sample/config/";
+
+ private TestingServer testZkServer;
+ private String zookeeperUrl;
+ private CuratorFramework client;
+ private byte[] testGlobalConfig;
+ private Map<String, byte[]> testSensorConfigMap;
+
+ @Before
+ public void setup() throws Exception {
+ testZkServer = new TestingServer(true);
+ zookeeperUrl = testZkServer.getConnectString();
+ client = ConfigurationsUtils.getClient(zookeeperUrl);
+ client.start();
+ testGlobalConfig = ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigRoot);
+ testSensorConfigMap = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
+ }
+
+ @Test
+ public void test() throws Exception {
+ Assert.assertTrue(testGlobalConfig.length > 0);
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(testGlobalConfig, zookeeperUrl);
+ byte[] readGlobalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client);
+ Assert.assertTrue(Arrays.equals(testGlobalConfig, readGlobalConfigBytes));
+
+ Assert.assertTrue(testSensorConfigMap.size() > 0);
+ String testSensorType = "yaf";
+ byte[] testSensorConfigBytes = testSensorConfigMap.get(testSensorType);
+ ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(testSensorType, testSensorConfigBytes, zookeeperUrl);
+ byte[] readSensorConfigBytes = ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(testSensorType, client);
+ Assert.assertTrue(Arrays.equals(testSensorConfigBytes, readSensorConfigBytes));
+
+ String name = "testConfig";
+ Map<String, Object> testConfig = new HashMap<>();
+ testConfig.put("stringField", "value");
+ testConfig.put("intField", 1);
+ testConfig.put("doubleField", 1.1);
+ ConfigurationsUtils.writeConfigToZookeeper(name, testConfig, zookeeperUrl);
+ byte[] readConfigBytes = ConfigurationsUtils.readConfigBytesFromZookeeper(name, client);
+ Assert.assertTrue(Arrays.equals(JSONUtils.INSTANCE.toJSON(testConfig), readConfigBytes));
+
+ }
+
+ @Test
+ public void testCmdLine() throws Exception {
+ String[] args = {"-z", zookeeperUrl, "-p", sampleConfigRoot};
+ ConfigurationsUtils.main(args);
+ byte[] readGlobalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client);
+ Assert.assertTrue(Arrays.equals(testGlobalConfig, readGlobalConfigBytes));
+ for(String sensorType: testSensorConfigMap.keySet()) {
+ byte[] readSensorConfigBytes = ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client);
+ Assert.assertTrue(Arrays.equals(testSensorConfigMap.get(sensorType), readSensorConfigBytes));
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ client.close();
+ testZkServer.close();
+ testZkServer.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 7b76c57..180c630 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -29,6 +29,7 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.metron.Constants;
import org.apache.metron.bolt.ConfiguredBolt;
+import org.apache.metron.domain.Configurations;
import org.apache.metron.domain.Enrichment;
import org.apache.metron.domain.SensorEnrichmentConfig;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
@@ -114,7 +115,7 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
return this;
}
@Override
- protected void reloadCallback() {
+ protected void reloadCallback(String name, Configurations.Type type) {
if(invalidateCacheOnReload) {
if (cache != null) {
cache.invalidateAll();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/8d5fb1bb/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
index afeb56b..ebe445f 100644
--- a/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
+++ b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
@@ -53,7 +53,7 @@ public class SolrEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
Configurations configurations = SampleUtil.getSampleConfigs();
Map<String, Object> globalConfig = configurations.getGlobalConfig();
globalConfig.put("solr.zookeeper", solrComponent.getZookeeperUrl());
- ConfigurationsUtils.writerGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), testZookeeperUrl);
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), testZookeeperUrl);
} catch (Exception e) {
e.printStackTrace();
}