You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/10/20 21:20:21 UTC
[3/3] metron git commit: METRON-1241: Enable the REST API to use a
cache for the zookeeper config similar to the Bolts closes
apache/incubator-metron#795
METRON-1241: Enable the REST API to use a cache for the zookeeper config similar to the Bolts closes apache/incubator-metron#795
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/cc111ec9
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/cc111ec9
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/cc111ec9
Branch: refs/heads/master
Commit: cc111ec984a78db43c4df222851f59280ff5eff9
Parents: aee0184
Author: cstella <ce...@gmail.com>
Authored: Fri Oct 20 17:20:06 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Fri Oct 20 17:20:06 2017 -0400
----------------------------------------------------------------------
.../profiler/bolt/ProfileBuilderBoltTest.java | 2 +-
.../profiler/bolt/ProfileSplitterBoltTest.java | 2 +-
.../metron/rest/config/ZookeeperConfig.java | 11 +
.../service/impl/GlobalConfigServiceImpl.java | 33 ++-
.../impl/SensorEnrichmentConfigServiceImpl.java | 43 ++-
.../impl/SensorIndexingConfigServiceImpl.java | 40 +--
.../impl/SensorParserConfigServiceImpl.java | 38 ++-
.../apache/metron/rest/config/TestConfig.java | 11 +
.../GlobalConfigControllerIntegrationTest.java | 6 +-
...richmentConfigControllerIntegrationTest.java | 6 +-
...IndexingConfigControllerIntegrationTest.java | 6 +-
...orParserConfigControllerIntegrationTest.java | 19 +-
.../StormControllerIntegrationTest.java | 12 +
.../impl/GlobalConfigServiceImplTest.java | 30 +-
.../SensorEnrichmentConfigServiceImplTest.java | 99 +++----
.../SensorIndexingConfigServiceImplTest.java | 100 +++----
.../impl/SensorParserConfigServiceImplTest.java | 105 +++----
metron-platform/metron-common/pom.xml | 10 +-
.../metron/common/bolt/ConfiguredBolt.java | 54 ++--
.../common/bolt/ConfiguredEnrichmentBolt.java | 30 +-
.../common/bolt/ConfiguredIndexingBolt.java | 28 +-
.../common/bolt/ConfiguredParserBolt.java | 30 +-
.../common/bolt/ConfiguredProfilerBolt.java | 47 +--
.../common/configuration/Configurations.java | 27 +-
.../configuration/ConfigurationsUtils.java | 64 +++-
.../configuration/EnrichmentConfigurations.java | 46 ++-
.../configuration/IndexingConfigurations.java | 38 ++-
.../configuration/ParserConfigurations.java | 22 +-
.../configuration/profiler/ProfileResult.java | 8 +
.../profiler/ProfileResultExpressions.java | 7 +
.../profiler/ProfileTriageExpressions.java | 23 ++
.../configuration/profiler/ProfilerConfig.java | 7 +
.../profiler/ProfilerConfigurations.java | 11 +-
.../common/zookeeper/ConfigurationsCache.java | 44 +++
.../common/zookeeper/ZKConfigurationsCache.java | 179 +++++++++++
.../configurations/ConfigurationsUpdater.java | 152 ++++++++++
.../configurations/EnrichmentUpdater.java | 78 +++++
.../configurations/IndexingUpdater.java | 74 +++++
.../zookeeper/configurations/ParserUpdater.java | 74 +++++
.../configurations/ProfilerUpdater.java | 96 ++++++
.../zookeeper/configurations/Reloadable.java | 27 ++
.../metron-common/src/main/scripts/stellar | 2 +-
.../ZKConfigurationsCacheIntegrationTest.java | 296 +++++++++++++++++++
.../bolt/BulkMessageWriterBoltTest.java | 6 +-
.../enrichment/bolt/EnrichmentJoinBoltTest.java | 2 +-
.../bolt/EnrichmentSplitterBoltTest.java | 2 +-
.../bolt/GenericEnrichmentBoltTest.java | 2 +-
.../metron/enrichment/bolt/JoinBoltTest.java | 2 +-
.../metron/enrichment/bolt/SplitBoltTest.java | 2 +-
.../bolt/ThreatIntelJoinBoltTest.java | 2 +-
.../bolt/ThreatIntelSplitterBoltTest.java | 2 +-
.../metron/integration/utils/TestUtils.java | 22 ++
.../metron/parsers/bolt/ParserBoltTest.java | 176 +++++------
metron-platform/metron-test-utilities/pom.xml | 11 +-
.../apache/metron/test/bolt/BaseBoltTest.java | 3 +-
metron-platform/metron-zookeeper/pom.xml | 48 +++
.../metron/zookeeper/SimpleEventListener.java | 123 ++++++++
.../org/apache/metron/zookeeper/ZKCache.java | 196 ++++++++++++
metron-platform/pom.xml | 1 +
.../stellar-common/src/main/scripts/stellar | 2 +-
60 files changed, 2027 insertions(+), 612 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 62be86e..21d61ab 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -147,7 +147,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
ProfileBuilderBolt bolt = new ProfileBuilderBolt("zookeeperURL");
bolt.setCuratorFramework(client);
- bolt.setTreeCache(cache);
+ bolt.setZKCache(cache);
bolt.withPeriodDuration(10, TimeUnit.MINUTES);
bolt.withProfileTimeToLive(30, TimeUnit.MINUTES);
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
index d51401f..beab8d5 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
@@ -140,7 +140,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL");
bolt.setCuratorFramework(client);
- bolt.setTreeCache(cache);
+ bolt.setZKCache(cache);
bolt.getConfigurations().updateProfilerConfig(profilerConfig.getBytes("UTF-8"));
bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
index 1f72afb..6f4656e 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
@@ -24,6 +24,8 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
import org.apache.metron.rest.MetronRestConstants;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -37,6 +39,15 @@ import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
public class ZookeeperConfig {
@Bean(initMethod = "start", destroyMethod="close")
+ public ConfigurationsCache cache(CuratorFramework client) {
+ return new ZKConfigurationsCache( client
+ , ZKConfigurationsCache.ConfiguredTypes.ENRICHMENT
+ , ZKConfigurationsCache.ConfiguredTypes.PARSER
+ , ZKConfigurationsCache.ConfiguredTypes.INDEXING
+ );
+ }
+
+ @Bean(initMethod = "start", destroyMethod="close")
public CuratorFramework client(Environment environment) {
int sleepTime = Integer.parseInt(environment.getProperty(MetronRestConstants.CURATOR_SLEEP_TIME));
int maxRetries = Integer.parseInt(environment.getProperty(MetronRestConstants.CURATOR_MAX_RETRIES));
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
index e80380b..ed67994 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
@@ -17,27 +17,34 @@
*/
package org.apache.metron.rest.service.impl;
-import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.curator.framework.CuratorFramework;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
import org.apache.zookeeper.KeeperException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.io.ByteArrayInputStream;
import java.util.Map;
@Service
public class GlobalConfigServiceImpl implements GlobalConfigService {
private CuratorFramework client;
+ private ConfigurationsCache cache;
+
@Autowired
- public GlobalConfigServiceImpl(CuratorFramework client) {
+ public GlobalConfigServiceImpl(CuratorFramework client, ConfigurationsCache cache) {
this.client = client;
+ this.cache = cache;
+ }
+
+ public void setCache(ConfigurationsCache cache) {
+ this.cache = cache;
}
@Override
@@ -52,16 +59,14 @@ public class GlobalConfigServiceImpl implements GlobalConfigService {
@Override
public Map<String, Object> get() throws RestException {
- Map<String, Object> globalConfig;
- try {
- byte[] globalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client);
- globalConfig = JSONUtils.INSTANCE.load(new ByteArrayInputStream(globalConfigBytes), new TypeReference<Map<String, Object>>(){});
- } catch (KeeperException.NoNodeException e) {
- return null;
- } catch (Exception e) {
- throw new RestException(e);
- }
- return globalConfig;
+ Map<String, Object> globalConfig;
+ try {
+ EnrichmentConfigurations configs = cache.get( EnrichmentConfigurations.class);
+ globalConfig = configs.getGlobalConfig(false);
+ } catch (Exception e) {
+ throw new RestException(e.getMessage(), e);
+ }
+ return globalConfig;
}
@Override
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
index d4438a4..293b113 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
@@ -22,9 +22,12 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.metron.common.aggregator.Aggregators;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.SensorEnrichmentConfigService;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
import org.apache.zookeeper.KeeperException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -43,10 +46,13 @@ public class SensorEnrichmentConfigServiceImpl implements SensorEnrichmentConfig
private CuratorFramework client;
+ private ConfigurationsCache cache;
+
@Autowired
- public SensorEnrichmentConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client) {
+ public SensorEnrichmentConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client, ConfigurationsCache cache) {
this.objectMapper = objectMapper;
this.client = client;
+ this.cache = cache;
}
@Override
@@ -61,38 +67,27 @@ public class SensorEnrichmentConfigServiceImpl implements SensorEnrichmentConfig
@Override
public SensorEnrichmentConfig findOne(String name) throws RestException {
- SensorEnrichmentConfig sensorEnrichmentConfig;
- try {
- sensorEnrichmentConfig = ConfigurationsUtils.readSensorEnrichmentConfigFromZookeeper(name, client);
- } catch (KeeperException.NoNodeException e) {
- return null;
- } catch (Exception e) {
- throw new RestException(e);
- }
- return sensorEnrichmentConfig;
+ EnrichmentConfigurations configs = cache.get( EnrichmentConfigurations.class);
+ return configs.getSensorEnrichmentConfig(name);
}
@Override
public Map<String, SensorEnrichmentConfig> getAll() throws RestException {
- Map<String, SensorEnrichmentConfig> sensorEnrichmentConfigs = new HashMap<>();
- List<String> sensorNames = getAllTypes();
- for (String name : sensorNames) {
- sensorEnrichmentConfigs.put(name, findOne(name));
+ Map<String, SensorEnrichmentConfig> sensorEnrichmentConfigs = new HashMap<>();
+ List<String> sensorNames = getAllTypes();
+ for (String name : sensorNames) {
+ SensorEnrichmentConfig config = findOne(name);
+ if(config != null) {
+ sensorEnrichmentConfigs.put(name, config);
}
- return sensorEnrichmentConfigs;
+ }
+ return sensorEnrichmentConfigs;
}
@Override
public List<String> getAllTypes() throws RestException {
- List<String> types;
- try {
- types = client.getChildren().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot());
- } catch (KeeperException.NoNodeException e) {
- types = new ArrayList<>();
- } catch (Exception e) {
- throw new RestException(e);
- }
- return types;
+ EnrichmentConfigurations configs = cache.get( EnrichmentConfigurations.class);
+ return configs.getTypes();
}
@Override
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
index 9f984e0..5c73b26 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
@@ -17,20 +17,19 @@
*/
package org.apache.metron.rest.service.impl;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.SensorIndexingConfigService;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
import org.apache.zookeeper.KeeperException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,10 +41,13 @@ public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigServ
private CuratorFramework client;
+ private ConfigurationsCache cache;
+
@Autowired
- public SensorIndexingConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client) {
+ public SensorIndexingConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client, ConfigurationsCache cache) {
this.objectMapper = objectMapper;
this.client = client;
+ this.cache = cache;
}
@Override
@@ -60,16 +62,8 @@ public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigServ
@Override
public Map<String, Object> findOne(String name) throws RestException {
- Map<String, Object> sensorIndexingConfig;
- try {
- byte[] sensorIndexingConfigBytes = ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper(name, client);
- sensorIndexingConfig = JSONUtils.INSTANCE.load(new ByteArrayInputStream(sensorIndexingConfigBytes), new TypeReference<Map<String, Object>>(){});
- } catch (KeeperException.NoNodeException e) {
- return null;
- } catch (Exception e) {
- throw new RestException(e);
- }
- return sensorIndexingConfig;
+ IndexingConfigurations configs = cache.get( IndexingConfigurations.class);
+ return configs.getSensorIndexingConfig(name, false);
}
@Override
@@ -77,22 +71,18 @@ public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigServ
Map<String, Map<String, Object>> sensorIndexingConfigs = new HashMap<>();
List<String> sensorNames = getAllTypes();
for (String name : sensorNames) {
- sensorIndexingConfigs.put(name, findOne(name));
+ Map<String, Object> config = findOne(name);
+ if(config != null) {
+ sensorIndexingConfigs.put(name, config);
+ }
}
return sensorIndexingConfigs;
}
@Override
public List<String> getAllTypes() throws RestException {
- List<String> types;
- try {
- types = client.getChildren().forPath(ConfigurationType.INDEXING.getZookeeperRoot());
- } catch (KeeperException.NoNodeException e) {
- types = new ArrayList<>();
- } catch (Exception e) {
- throw new RestException(e);
- }
- return types;
+ IndexingConfigurations configs = cache.get( IndexingConfigurations.class);
+ return configs.getTypes();
}
@Override
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
index f99b41c..7e70344 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
@@ -29,13 +29,16 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.fs.Path;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.rest.MetronRestConstants;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.ParseMessageRequest;
import org.apache.metron.rest.service.GrokService;
import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
import org.apache.zookeeper.KeeperException;
import org.json.simple.JSONObject;
import org.reflections.Reflections;
@@ -49,17 +52,21 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
private CuratorFramework client;
+ private ConfigurationsCache cache;
+
private GrokService grokService;
+ private Map<String, String> availableParsers;
+
@Autowired
public SensorParserConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client,
- GrokService grokService) {
+ GrokService grokService, ConfigurationsCache cache) {
this.objectMapper = objectMapper;
this.client = client;
this.grokService = grokService;
+ this.cache = cache;
}
- private Map<String, String> availableParsers;
@Override
public SensorParserConfig save(SensorParserConfig sensorParserConfig) throws RestException {
@@ -74,15 +81,8 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
@Override
public SensorParserConfig findOne(String name) throws RestException {
- SensorParserConfig sensorParserConfig;
- try {
- sensorParserConfig = ConfigurationsUtils.readSensorParserConfigFromZookeeper(name, client);
- } catch (KeeperException.NoNodeException e) {
- return null;
- } catch (Exception e) {
- throw new RestException(e);
- }
- return sensorParserConfig;
+ ParserConfigurations configs = cache.get( ParserConfigurations.class);
+ return configs.getSensorParserConfig(name);
}
@Override
@@ -90,7 +90,10 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
List<SensorParserConfig> sensorParserConfigs = new ArrayList<>();
List<String> sensorNames = getAllTypes();
for (String name : sensorNames) {
- sensorParserConfigs.add(findOne(name));
+ SensorParserConfig config = findOne(name);
+ if(config != null) {
+ sensorParserConfigs.add(config);
+ }
}
return sensorParserConfigs;
}
@@ -109,15 +112,8 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
@Override
public List<String> getAllTypes() throws RestException {
- List<String> types;
- try {
- types = client.getChildren().forPath(ConfigurationType.PARSER.getZookeeperRoot());
- } catch (KeeperException.NoNodeException e) {
- types = new ArrayList<>();
- } catch (Exception e) {
- throw new RestException(e);
- }
- return types;
+ ParserConfigurations configs = cache.get( ParserConfigurations.class);
+ return configs.getTypes();
}
@Override
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index ea64fbe..1150189 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -36,6 +36,8 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.UnableToStartException;
@@ -75,6 +77,15 @@ public class TestConfig {
return new KafkaComponent().withTopologyProperties(zkProperties);
}
+ @Bean(initMethod = "start", destroyMethod="close")
+ public ConfigurationsCache cache(CuratorFramework client) {
+ return new ZKConfigurationsCache( client
+ , ZKConfigurationsCache.ConfiguredTypes.ENRICHMENT
+ , ZKConfigurationsCache.ConfiguredTypes.PARSER
+ , ZKConfigurationsCache.ConfiguredTypes.INDEXING
+ );
+ }
+
@Bean(destroyMethod = "stop")
public ComponentRunner componentRunner(ZKServerComponent zkServerComponent, KafkaComponent kafkaWithZKComponent) {
ComponentRunner runner = new ComponentRunner.Builder()
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java
index f4e18ea..abb75b1 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java
@@ -31,6 +31,7 @@ import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
@@ -97,9 +98,10 @@ public class GlobalConfigControllerIntegrationTest {
.andExpect(status().isCreated())
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")));
- this.mockMvc.perform(post(globalConfigUrl).with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(globalJson))
+ assertEventually(() -> this.mockMvc.perform(post(globalConfigUrl).with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(globalJson))
.andExpect(status().isOk())
- .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")));
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ );
this.mockMvc.perform(get(globalConfigUrl).with(httpBasic(user,password)))
.andExpect(status().isOk());
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
index dd4eff7..15a2370 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
@@ -31,6 +31,7 @@ import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
@@ -167,7 +168,7 @@ public class SensorEnrichmentConfigControllerIntegrationTest {
.andExpect(jsonPath("$.threatIntel.triageConfig.riskLevelRules[0].score").value(10))
.andExpect(jsonPath("$.threatIntel.triageConfig.aggregator").value("MAX"));
- this.mockMvc.perform(post(sensorEnrichmentConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
+ assertEventually(() -> this.mockMvc.perform(post(sensorEnrichmentConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
.andExpect(jsonPath("$.enrichment.fieldMap.geo[0]").value("ip_dst_addr"))
@@ -183,7 +184,8 @@ public class SensorEnrichmentConfigControllerIntegrationTest {
.andExpect(jsonPath("$.threatIntel.fieldToTypeMap.ip_dst_addr[0]").value("malicious_ip"))
.andExpect(jsonPath("$.threatIntel.triageConfig.riskLevelRules[0].rule").value("ip_src_addr == '10.122.196.204' or ip_dst_addr == '10.122.196.204'"))
.andExpect(jsonPath("$.threatIntel.triageConfig.riskLevelRules[0].score").value(10))
- .andExpect(jsonPath("$.threatIntel.triageConfig.aggregator").value("MAX"));
+ .andExpect(jsonPath("$.threatIntel.triageConfig.aggregator").value("MAX") )
+ );
this.mockMvc.perform(get(sensorEnrichmentConfigUrl + "/broTest").with(httpBasic(user,password)))
.andExpect(status().isOk())
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java
index cebcde6..674c55a 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java
@@ -31,6 +31,7 @@ import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
@@ -103,11 +104,12 @@ public class SensorIndexingConfigControllerIntegrationTest {
.andExpect(jsonPath("$.index").value("broTest"))
.andExpect(jsonPath("$.batchSize").value(1));
- this.mockMvc.perform(post(sensorIndexingConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
+ assertEventually(() -> this.mockMvc.perform(post(sensorIndexingConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
.andExpect(jsonPath("$.index").value("broTest"))
- .andExpect(jsonPath("$.batchSize").value(1));
+ .andExpect(jsonPath("$.batchSize").value(1))
+ );
this.mockMvc.perform(get(sensorIndexingConfigUrl + "/broTest").with(httpBasic(user,password)))
.andExpect(status().isOk())
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
index 6e2d788..d8aea72 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
@@ -38,7 +38,9 @@ import org.springframework.web.context.WebApplicationContext;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
import static org.hamcrest.Matchers.hasSize;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
@@ -198,16 +200,16 @@ public class SensorParserConfigControllerIntegrationTest {
this.sensorParserConfigService.delete("broTest");
this.sensorParserConfigService.delete("squidTest");
Method[] method = SensorParserConfig.class.getMethods();
- int numFields = 0;
+ final AtomicInteger numFields = new AtomicInteger(0);
for(Method m : method) {
if(m.getName().startsWith("set")) {
- numFields++;
+ numFields.set(numFields.get() + 1);
}
}
this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(squidJson))
.andExpect(status().isCreated())
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
- .andExpect(jsonPath("$.*", hasSize(numFields)))
+ .andExpect(jsonPath("$.*", hasSize(numFields.get())))
.andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.GrokParser"))
.andExpect(jsonPath("$.sensorTopic").value("squidTest"))
.andExpect(jsonPath("$.parserConfig.grokPath").value("target/patterns/squidTest"))
@@ -219,10 +221,10 @@ public class SensorParserConfigControllerIntegrationTest {
.andExpect(jsonPath("$.fieldTransformations[0].config.full_hostname").value("URL_TO_HOST(url)"))
.andExpect(jsonPath("$.fieldTransformations[0].config.domain_without_subdomains").value("DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"));
- this.mockMvc.perform(get(sensorParserConfigUrl + "/squidTest").with(httpBasic(user,password)))
+ assertEventually(() -> this.mockMvc.perform(get(sensorParserConfigUrl + "/squidTest").with(httpBasic(user,password)))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
- .andExpect(jsonPath("$.*", hasSize(numFields)))
+ .andExpect(jsonPath("$.*", hasSize(numFields.get())))
.andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.GrokParser"))
.andExpect(jsonPath("$.sensorTopic").value("squidTest"))
.andExpect(jsonPath("$.parserConfig.grokPath").value("target/patterns/squidTest"))
@@ -232,7 +234,8 @@ public class SensorParserConfigControllerIntegrationTest {
.andExpect(jsonPath("$.fieldTransformations[0].output[0]").value("full_hostname"))
.andExpect(jsonPath("$.fieldTransformations[0].output[1]").value("domain_without_subdomains"))
.andExpect(jsonPath("$.fieldTransformations[0].config.full_hostname").value("URL_TO_HOST(url)"))
- .andExpect(jsonPath("$.fieldTransformations[0].config.domain_without_subdomains").value("DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"));
+ .andExpect(jsonPath("$.fieldTransformations[0].config.domain_without_subdomains").value("DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"))
+ );
this.mockMvc.perform(get(sensorParserConfigUrl).with(httpBasic(user,password)))
.andExpect(status().isOk())
@@ -251,7 +254,7 @@ public class SensorParserConfigControllerIntegrationTest {
this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
.andExpect(status().isCreated())
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
- .andExpect(jsonPath("$.*", hasSize(numFields)))
+ .andExpect(jsonPath("$.*", hasSize(numFields.get())))
.andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.bro.BasicBroParser"))
.andExpect(jsonPath("$.sensorTopic").value("broTest"))
.andExpect(jsonPath("$.readMetadata").value("true"))
@@ -261,7 +264,7 @@ public class SensorParserConfigControllerIntegrationTest {
this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
- .andExpect(jsonPath("$.*", hasSize(numFields)))
+ .andExpect(jsonPath("$.*", hasSize(numFields.get())))
.andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.bro.BasicBroParser"))
.andExpect(jsonPath("$.sensorTopic").value("broTest"))
.andExpect(jsonPath("$.readMetadata").value("true"))
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
index 5c6dd12..e3518ca 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
@@ -18,9 +18,11 @@
package org.apache.metron.rest.controller;
import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.integration.utils.TestUtils;
import org.apache.metron.rest.model.TopologyStatusCode;
import org.apache.metron.rest.service.GlobalConfigService;
import org.apache.metron.rest.service.SensorParserConfigService;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -169,6 +171,11 @@ public class StormControllerIntegrationTest {
.andExpect(jsonPath("$.message").value(TopologyStatusCode.GLOBAL_CONFIG_MISSING.name()));
globalConfigService.save(globalConfig);
+ {
+ final Map<String, Object> expectedGlobalConfig = globalConfig;
+ //we must wait for the config to find its way into the config.
+ TestUtils.assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, globalConfigService.get()));
+ }
this.mockMvc.perform(get(stormUrl + "/parser/start/broTest").with(httpBasic(user,password)))
.andExpect(status().isOk())
@@ -179,6 +186,11 @@ public class StormControllerIntegrationTest {
sensorParserConfig.setParserClassName("org.apache.metron.parsers.bro.BasicBroParser");
sensorParserConfig.setSensorTopic("broTest");
sensorParserConfigService.save(sensorParserConfig);
+ {
+ final Map<String, Object> expectedGlobalConfig = globalConfig;
+ //we must wait for the config to find its way into the config.
+ TestUtils.assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, globalConfigService.get()));
+ }
this.mockMvc.perform(get(stormUrl + "/parser/start/broTest").with(httpBasic(user,password)))
.andExpect(status().isOk())
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
index 824fb4b..85a66b3 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
@@ -28,11 +28,15 @@ import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.GlobalConfigService;
import org.apache.zookeeper.KeeperException;
@@ -49,11 +53,13 @@ public class GlobalConfigServiceImplTest {
CuratorFramework curatorFramework;
GlobalConfigService globalConfigService;
+ ConfigurationsCache cache;
@Before
public void setUp() throws Exception {
curatorFramework = mock(CuratorFramework.class);
- globalConfigService = new GlobalConfigServiceImpl(curatorFramework);
+ cache = mock(ConfigurationsCache.class);
+ globalConfigService = new GlobalConfigServiceImpl(curatorFramework, cache);
}
@@ -98,25 +104,19 @@ public class GlobalConfigServiceImplTest {
put("k", "v");
}};
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenReturn(config.getBytes());
-
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
+ EnrichmentConfigurations configs = new EnrichmentConfigurations(){
+ @Override
+ public Map<String, Object> getConfigurations() {
+ return ImmutableMap.of(ConfigurationType.GLOBAL.getTypeName(), configMap);
+ }
+ };
+ when(cache.get( eq(EnrichmentConfigurations.class)))
+ .thenReturn(configs);
assertEquals(configMap, globalConfigService.get());
}
@Test
- public void getShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
-
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
- assertNull(globalConfigService.get());
- }
-
- @Test
public void getShouldWrapNonNoNodeExceptionInRestException() throws Exception {
exception.expect(RestException.class);
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
index c26a210..0a78f4a 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
@@ -18,6 +18,7 @@
package org.apache.metron.rest.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.DeleteBuilder;
@@ -25,9 +26,11 @@ import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
import org.apache.metron.common.configuration.enrichment.threatintel.ThreatIntelConfig;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.SensorEnrichmentConfigService;
import org.apache.zookeeper.KeeperException;
@@ -40,6 +43,7 @@ import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -79,11 +83,14 @@ public class SensorEnrichmentConfigServiceImplTest {
@Multiline
public static String broJson;
+ ConfigurationsCache cache;
+
@Before
public void setUp() throws Exception {
objectMapper = mock(ObjectMapper.class);
curatorFramework = mock(CuratorFramework.class);
- sensorEnrichmentConfigService = new SensorEnrichmentConfigServiceImpl(objectMapper, curatorFramework);
+ cache = mock(ConfigurationsCache.class);
+ sensorEnrichmentConfigService = new SensorEnrichmentConfigServiceImpl(objectMapper, curatorFramework, cache);
}
@@ -125,84 +132,54 @@ public class SensorEnrichmentConfigServiceImplTest {
public void findOneShouldProperlyReturnSensorEnrichmentConfig() throws Exception {
final SensorEnrichmentConfig sensorEnrichmentConfig = getTestSensorEnrichmentConfig();
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
+ EnrichmentConfigurations configs = new EnrichmentConfigurations(){
+ @Override
+ public Map<String, Object> getConfigurations() {
+ return ImmutableMap.of(EnrichmentConfigurations.getKey("bro"), sensorEnrichmentConfig);
+ }
+ };
+ when(cache.get(eq(EnrichmentConfigurations.class)))
+ .thenReturn(configs);
+ //We only have bro, so we should expect it to be returned
assertEquals(getTestSensorEnrichmentConfig(), sensorEnrichmentConfigService.findOne("bro"));
- }
-
- @Test
- public void findOneShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenThrow(KeeperException.NoNodeException.class);
-
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
- assertNull(sensorEnrichmentConfigService.findOne("bro"));
- }
-
- @Test
- public void findOneShouldWrapNonNoNodeExceptionInRestException() throws Exception {
- exception.expect(RestException.class);
-
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenThrow(Exception.class);
-
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
- sensorEnrichmentConfigService.findOne("bro");
+ //and blah should be a miss.
+ assertNull(sensorEnrichmentConfigService.findOne("blah"));
}
@Test
public void getAllTypesShouldProperlyReturnTypes() throws Exception {
- GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
- when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot()))
- .thenReturn(new ArrayList() {{
- add("bro");
- add("squid");
- }});
- when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+
+ EnrichmentConfigurations configs = new EnrichmentConfigurations(){
+ @Override
+ public Map<String, Object> getConfigurations() {
+ return ImmutableMap.of(EnrichmentConfigurations.getKey("bro"), new HashMap<>()
+ ,EnrichmentConfigurations.getKey("squid"), new HashMap<>()
+ );
+ }
+ };
+ when(cache.get(eq(EnrichmentConfigurations.class)))
+ .thenReturn(configs);
assertEquals(new ArrayList() {{
add("bro");
add("squid");
}}, sensorEnrichmentConfigService.getAllTypes());
- }
- @Test
- public void getAllTypesShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
- GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
- when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
- when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
- assertEquals(new ArrayList<>(), sensorEnrichmentConfigService.getAllTypes());
}
- @Test
- public void getAllTypesShouldWrapNonNoNodeExceptionInRestException() throws Exception {
- exception.expect(RestException.class);
-
- GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
- when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot())).thenThrow(Exception.class);
- when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
- sensorEnrichmentConfigService.getAllTypes();
- }
@Test
public void getAllShouldProperlyReturnSensorEnrichmentConfigs() throws Exception {
- GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
- when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot()))
- .thenReturn(new ArrayList() {{
- add("bro");
- }});
- when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
final SensorEnrichmentConfig sensorEnrichmentConfig = getTestSensorEnrichmentConfig();
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
+ EnrichmentConfigurations configs = new EnrichmentConfigurations(){
+ @Override
+ public Map<String, Object> getConfigurations() {
+ return ImmutableMap.of(EnrichmentConfigurations.getKey("bro"), sensorEnrichmentConfig);
+ }
+ };
+ when(cache.get( eq(EnrichmentConfigurations.class)))
+ .thenReturn(configs);
assertEquals(new HashMap() {{ put("bro", sensorEnrichmentConfig);}}, sensorEnrichmentConfigService.getAll());
}
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java
index 43ca0f7..9641a52 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java
@@ -17,7 +17,10 @@
*/
package org.apache.metron.rest.service.impl;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.DeleteBuilder;
@@ -25,6 +28,9 @@ import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.SensorIndexingConfigService;
import org.apache.zookeeper.KeeperException;
@@ -36,6 +42,7 @@ import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -55,6 +62,7 @@ public class SensorIndexingConfigServiceImplTest {
ObjectMapper objectMapper;
CuratorFramework curatorFramework;
SensorIndexingConfigService sensorIndexingConfigService;
+ ConfigurationsCache cache;
/**
{
@@ -72,7 +80,8 @@ public class SensorIndexingConfigServiceImplTest {
public void setUp() throws Exception {
objectMapper = mock(ObjectMapper.class);
curatorFramework = mock(CuratorFramework.class);
- sensorIndexingConfigService = new SensorIndexingConfigServiceImpl(objectMapper, curatorFramework);
+ cache = mock(ConfigurationsCache.class);
+ sensorIndexingConfigService = new SensorIndexingConfigServiceImpl(objectMapper, curatorFramework, cache);
}
@@ -114,44 +123,36 @@ public class SensorIndexingConfigServiceImplTest {
public void findOneShouldProperlyReturnSensorEnrichmentConfig() throws Exception {
final Map<String, Object> sensorIndexingConfig = getTestSensorIndexingConfig();
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
+ IndexingConfigurations configs = new IndexingConfigurations(){
+ @Override
+ public Map<String, Object> getConfigurations() {
+ return ImmutableMap.of(IndexingConfigurations.getKey("bro"), sensorIndexingConfig);
+ }
+ };
+ when(cache.get( eq(IndexingConfigurations.class)))
+ .thenReturn(configs);
+ //We only have bro, so we should expect it to be returned
assertEquals(getTestSensorIndexingConfig(), sensorIndexingConfigService.findOne("bro"));
+ //and blah should be a miss.
+ assertNull(sensorIndexingConfigService.findOne("blah"));
}
- @Test
- public void findOneShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenThrow(KeeperException.NoNodeException.class);
-
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
- assertNull(sensorIndexingConfigService.findOne("bro"));
- }
-
- @Test
- public void findOneShouldWrapNonNoNodeExceptionInRestException() throws Exception {
- exception.expect(RestException.class);
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenThrow(Exception.class);
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
- sensorIndexingConfigService.findOne("bro");
- }
@Test
public void getAllTypesShouldProperlyReturnTypes() throws Exception {
- GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
- when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot()))
- .thenReturn(new ArrayList() {{
- add("bro");
- add("squid");
- }});
- when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+ IndexingConfigurations configs = new IndexingConfigurations(){
+ @Override
+ public Map<String, Object> getConfigurations() {
+ return ImmutableMap.of(IndexingConfigurations.getKey("bro"), new HashMap<>()
+ ,IndexingConfigurations.getKey("squid"), new HashMap<>()
+ );
+ }
+ };
+ when(cache.get(eq(IndexingConfigurations.class)))
+ .thenReturn(configs);
assertEquals(new ArrayList() {{
add("bro");
@@ -159,39 +160,18 @@ public class SensorIndexingConfigServiceImplTest {
}}, sensorIndexingConfigService.getAllTypes());
}
- @Test
- public void getAllTypesShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
- GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
- when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
- when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
- assertEquals(new ArrayList<>(), sensorIndexingConfigService.getAllTypes());
- }
@Test
- public void getAllTypesShouldWrapNonNoNodeExceptionInRestException() throws Exception {
- exception.expect(RestException.class);
-
- GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
- when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot())).thenThrow(Exception.class);
- when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
- sensorIndexingConfigService.getAllTypes();
- }
-
- @Test
- public void getAllShouldProperlyReturnSensorEnrichmentConfigs() throws Exception {
+ public void getAllShouldProperlyReturnIndexingConfigs() throws Exception {
final Map<String, Object> sensorIndexingConfig = getTestSensorIndexingConfig();
-
- GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
- when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot()))
- .thenReturn(new ArrayList() {{
- add("bro");
- }});
- when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
+ IndexingConfigurations configs = new IndexingConfigurations(){
+ @Override
+ public Map<String, Object> getConfigurations() {
+ return ImmutableMap.of(IndexingConfigurations.getKey("bro"), sensorIndexingConfig );
+ }
+ };
+ when(cache.get(eq(IndexingConfigurations.class)))
+ .thenReturn(configs);
assertEquals(new HashMap() {{ put("bro", sensorIndexingConfig);}}, sensorIndexingConfigService.getAll());
}
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
index c96a796..7998c21 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
@@ -18,6 +18,7 @@
package org.apache.metron.rest.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
import oi.thekraken.grok.api.Grok;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.curator.framework.CuratorFramework;
@@ -27,7 +28,9 @@ import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.ParseMessageRequest;
import org.apache.metron.rest.service.GrokService;
@@ -95,6 +98,8 @@ public class SensorParserConfigServiceImplTest {
private String user = "user1";
+ ConfigurationsCache cache;
+
@Before
public void setUp() throws Exception {
objectMapper = mock(ObjectMapper.class);
@@ -105,7 +110,8 @@ public class SensorParserConfigServiceImplTest {
SecurityContextHolder.getContext().setAuthentication(authentication);
when(environment.getProperty(GROK_TEMP_PATH_SPRING_PROPERTY)).thenReturn("./target");
grokService = new GrokServiceImpl(environment, mock(Grok.class), new HdfsServiceImpl(new Configuration()));
- sensorParserConfigService = new SensorParserConfigServiceImpl(objectMapper, curatorFramework, grokService);
+ cache = mock(ConfigurationsCache.class);
+ sensorParserConfigService = new SensorParserConfigServiceImpl(objectMapper, curatorFramework, grokService, cache);
}
@@ -144,47 +150,36 @@ public class SensorParserConfigServiceImplTest {
}
@Test
- public void findOneShouldProperlyReturnSensorEnrichmentConfig() throws Exception {
+ public void findOneShouldProperlyReturnSensorParserConfig() throws Exception {
final SensorParserConfig sensorParserConfig = getTestBroSensorParserConfig();
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
+ ParserConfigurations configs = new ParserConfigurations(){
+ @Override
+ public Map<String, Object> getConfigurations() {
+ return ImmutableMap.of(ParserConfigurations.getKey("bro"), sensorParserConfig);
+ }
+ };
+ when(cache.get(eq(ParserConfigurations.class)))
+ .thenReturn(configs);
+ //We only have bro, so we should expect it to be returned
assertEquals(getTestBroSensorParserConfig(), sensorParserConfigService.findOne("bro"));
- }
-
- @Test
- public void findOneShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenThrow(KeeperException.NoNodeException.class);
-
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
- assertNull(sensorParserConfigService.findOne("bro"));
- }
-
- @Test
- public void findOneShouldWrapNonNoNodeExceptionInRestException() throws Exception {
- exception.expect(RestException.class);
-
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenThrow(Exception.class);
-
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
- sensorParserConfigService.findOne("bro");
+ //and blah should be a miss.
+ assertNull(sensorParserConfigService.findOne("blah"));
}
@Test
public void getAllTypesShouldProperlyReturnTypes() throws Exception {
- GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
- when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot()))
- .thenReturn(new ArrayList() {{
- add("bro");
- add("squid");
- }});
- when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+ ParserConfigurations configs = new ParserConfigurations(){
+ @Override
+ public Map<String, Object> getConfigurations() {
+ return ImmutableMap.of(ParserConfigurations.getKey("bro"), new HashMap<>()
+ ,ParserConfigurations.getKey("squid"), new HashMap<>()
+ );
+ }
+ };
+ when(cache.get( eq(ParserConfigurations.class)))
+ .thenReturn(configs);
assertEquals(new ArrayList() {{
add("bro");
@@ -193,41 +188,19 @@ public class SensorParserConfigServiceImplTest {
}
@Test
- public void getAllTypesShouldReturnEmptyListWhenNoNodeExceptionIsThrown() throws Exception {
- GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
- when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
- when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
- assertEquals(new ArrayList<>(), sensorParserConfigService.getAllTypes());
- }
-
- @Test
- public void getAllTypesShouldWrapNonNoNodeExceptionInRestException() throws Exception {
- exception.expect(RestException.class);
-
- GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
- when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot())).thenThrow(Exception.class);
- when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
- sensorParserConfigService.getAllTypes();
- }
-
- @Test
public void getAllShouldProperlyReturnSensorParserConfigs() throws Exception {
- GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
- when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot()))
- .thenReturn(new ArrayList() {{
- add("bro");
- add("squid");
- }});
- when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
final SensorParserConfig broSensorParserConfig = getTestBroSensorParserConfig();
final SensorParserConfig squidSensorParserConfig = getTestSquidSensorParserConfig();
- GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
- when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
- when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/squid")).thenReturn(squidJson.getBytes());
- when(curatorFramework.getData()).thenReturn(getDataBuilder);
+ ParserConfigurations configs = new ParserConfigurations(){
+ @Override
+ public Map<String, Object> getConfigurations() {
+ return ImmutableMap.of(ParserConfigurations.getKey("bro"), broSensorParserConfig
+ ,ParserConfigurations.getKey("squid"), squidSensorParserConfig
+ );
+ }
+ };
+ when(cache.get( eq(ParserConfigurations.class)))
+ .thenReturn(configs);
assertEquals(new ArrayList() {{
add(getTestBroSensorParserConfig());
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 3054881..8734d63 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -54,6 +54,11 @@
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
+ <artifactId>metron-zookeeper</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
<artifactId>metron-integration-test</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
@@ -289,11 +294,6 @@
<version>${global_jackson_version}</version>
</dependency>
<dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>${global_curator_version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
<version>${global_flux_version}</version>
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
index a97091a..6f15746 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
@@ -17,54 +17,58 @@
*/
package org.apache.metron.common.bolt;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.Configurations;
import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.zookeeper.SimpleEventListener;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.Reloadable;
+import org.apache.metron.zookeeper.ZKCache;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichBolt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends BaseRichBolt {
+public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends BaseRichBolt implements Reloadable {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String zookeeperUrl;
protected CuratorFramework client;
- protected TreeCache cache;
- private final CONFIG_T configurations = defaultConfigurations();
+ protected ZKCache cache;
+ private final CONFIG_T configurations;
public ConfiguredBolt(String zookeeperUrl) {
this.zookeeperUrl = zookeeperUrl;
+ this.configurations = createUpdater().defaultConfigurations();
}
public void setCuratorFramework(CuratorFramework client) {
this.client = client;
}
- public void setTreeCache(TreeCache cache) {
+ public void setZKCache(ZKCache cache) {
this.cache = cache;
}
+ @Override
public void reloadCallback(String name, ConfigurationType type) {
}
+
public CONFIG_T getConfigurations() {
return configurations;
}
- protected abstract CONFIG_T defaultConfigurations();
+ protected abstract ConfigurationsUpdater<CONFIG_T> createUpdater();
@Override
@@ -85,30 +89,30 @@ public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends Ba
//zookeeper.
ConfigurationsUtils.setupStellarStatically(client);
if (cache == null) {
- cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
- TreeCacheListener listener = new TreeCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
- if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
- String path = event.getData().getPath();
- byte[] data = event.getData().getData();
- updateConfig(path, data);
- }
- }
- };
- cache.getListenable().addListener(listener);
- loadConfig();
+ ConfigurationsUpdater<CONFIG_T> updater = createUpdater();
+ SimpleEventListener listener = new SimpleEventListener.Builder()
+ .with( updater::update
+ , TreeCacheEvent.Type.NODE_ADDED
+ , TreeCacheEvent.Type.NODE_UPDATED
+ )
+ .with( updater::delete
+ , TreeCacheEvent.Type.NODE_REMOVED
+ )
+ .build();
+ cache = new ZKCache.Builder()
+ .withClient(client)
+ .withListener(listener)
+ .withRoot(Constants.ZOOKEEPER_TOPOLOGY_ROOT)
+ .build();
+ updater.forceUpdate(client);
+ cache.start();
}
- cache.start();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
- abstract public void loadConfig();
- abstract public void updateConfig(String path, byte[] data) throws IOException;
-
@Override
public void cleanup() {
cache.close();
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
index 9c3ee97..54fd7e8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
@@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.EnrichmentUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,31 +37,7 @@ public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt<Enrichment
}
@Override
- protected EnrichmentConfigurations defaultConfigurations() {
- return new EnrichmentConfigurations();
- }
-
- @Override
- public void loadConfig() {
- try {
-
- ConfigurationsUtils.updateEnrichmentConfigsFromZookeeper(getConfigurations(), client);
- } catch (Exception e) {
- LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
- }
- }
-
- @Override
- public void updateConfig(String path, byte[] data) throws IOException {
- if (data.length != 0) {
- String name = path.substring(path.lastIndexOf("/") + 1);
- if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
- getConfigurations().updateSensorEnrichmentConfig(name, data);
- reloadCallback(name, ConfigurationType.ENRICHMENT);
- } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
- getConfigurations().updateGlobalConfig(data);
- reloadCallback(name, ConfigurationType.GLOBAL);
- }
- }
+ protected ConfigurationsUpdater<EnrichmentConfigurations> createUpdater() {
+ return new EnrichmentUpdater(this, this::getConfigurations);
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
index cddcada..09300e4 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
@@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.IndexingUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,30 +35,8 @@ public abstract class ConfiguredIndexingBolt extends ConfiguredBolt<IndexingConf
}
@Override
- protected IndexingConfigurations defaultConfigurations() {
- return new IndexingConfigurations();
+ protected ConfigurationsUpdater<IndexingConfigurations> createUpdater() {
+ return new IndexingUpdater(this, this::getConfigurations);
}
- @Override
- public void loadConfig() {
- try {
- ConfigurationsUtils.updateSensorIndexingConfigsFromZookeeper(getConfigurations(), client);
- } catch (Exception e) {
- LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
- }
- }
-
- @Override
- public void updateConfig(String path, byte[] data) throws IOException {
- if (data.length != 0) {
- String name = path.substring(path.lastIndexOf("/") + 1);
- if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
- getConfigurations().updateSensorIndexingConfig(name, data);
- reloadCallback(name, ConfigurationType.INDEXING);
- } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
- getConfigurations().updateGlobalConfig(data);
- reloadCallback(name, ConfigurationType.GLOBAL);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
index 99313fa..2f13658 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
@@ -23,6 +23,8 @@ import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.ParserUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,34 +43,14 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt<ParserConfigur
return getConfigurations().getSensorParserConfig(sensorType);
}
- @Override
- protected ParserConfigurations defaultConfigurations() {
- return new ParserConfigurations();
- }
-
public String getSensorType() {
return sensorType;
}
- @Override
- public void loadConfig() {
- try {
- ConfigurationsUtils.updateParserConfigsFromZookeeper(getConfigurations(), client);
- } catch (Exception e) {
- LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
- }
- }
+
@Override
- public void updateConfig(String path, byte[] data) throws IOException {
- if (data.length != 0) {
- String name = path.substring(path.lastIndexOf("/") + 1);
- if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
- getConfigurations().updateSensorParserConfig(name, data);
- reloadCallback(name, ConfigurationType.PARSER);
- } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
- getConfigurations().updateGlobalConfig(data);
- reloadCallback(name, ConfigurationType.GLOBAL);
- }
- }
+ protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+ return new ParserUpdater(this, this::getConfigurations);
}
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
index 22ff3a9..90575d0 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
@@ -17,16 +17,12 @@
*/
package org.apache.metron.common.bolt;
-import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
-import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,43 +42,8 @@ public abstract class ConfiguredProfilerBolt extends ConfiguredBolt<ProfilerConf
}
@Override
- protected ProfilerConfigurations defaultConfigurations() {
- return new ProfilerConfigurations();
+ protected ConfigurationsUpdater<ProfilerConfigurations> createUpdater() {
+ return new ProfilerUpdater(this, this::getConfigurations);
}
- @Override
- public void loadConfig() {
- try {
- ProfilerConfig config = readFromZookeeper(client);
- if(config != null) {
- getConfigurations().updateProfilerConfig(config);
- }
-
- } catch (Exception e) {
- LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
- }
- }
-
- private ProfilerConfig readFromZookeeper(CuratorFramework client) throws Exception {
- byte[] raw = client.getData().forPath(PROFILER.getZookeeperRoot());
- return JSONUtils.INSTANCE.load(new ByteArrayInputStream(raw), ProfilerConfig.class);
- }
-
- @Override
- public void updateConfig(String path, byte[] data) throws IOException {
- if (data.length != 0) {
- String name = path.substring(path.lastIndexOf("/") + 1);
-
- // update the profiler configuration from zookeeper
- if (path.startsWith(ConfigurationType.PROFILER.getZookeeperRoot())) {
- getConfigurations().updateProfilerConfig(data);
- reloadCallback(name, ConfigurationType.PROFILER);
-
- // update the global configuration from zookeeper
- } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
- getConfigurations().updateGlobalConfig(data);
- reloadCallback(name, ConfigurationType.GLOBAL);
- }
- }
- }
}