You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/04/25 13:27:41 UTC
metron git commit: METRON-1529 CONFIG_GET Fails to Retrieve Latest
Config When Run in Zeppelin REPL (nickwallen) closes apache/metron#997
Repository: metron
Updated Branches:
refs/heads/master b5bf9a987 -> 37e3fd32c
METRON-1529 CONFIG_GET Fails to Retrieve Latest Config When Run in Zeppelin REPL (nickwallen) closes apache/metron#997
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/37e3fd32
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/37e3fd32
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/37e3fd32
Branch: refs/heads/master
Commit: 37e3fd32c256ddc129eb7c1363d78e9095a39748
Parents: b5bf9a9
Author: nickwallen <ni...@nickallen.org>
Authored: Wed Apr 25 09:27:18 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Wed Apr 25 09:27:18 2018 -0400
----------------------------------------------------------------------
.../configuration/ConfigurationsUtils.java | 123 +++-
.../management/ConfigurationFunctions.java | 564 ++++++++++---------
.../management/ConfigurationFunctionsTest.java | 424 ++++++++++----
.../shell/DefaultStellarShellExecutor.java | 4 +-
.../common/utils/StellarProcessorUtils.java | 135 +++--
5 files changed, 825 insertions(+), 425 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
index a89db63..c7b39f0 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
@@ -45,6 +46,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.StellarFunctions;
@@ -235,12 +237,99 @@ public class ConfigurationsUtils {
);
}
+ /**
+ * Reads the global configuration stored in Zookeeper.
+ *
+ * @param client The Zookeeper client.
+ * @return The global configuration, if one exists. Otherwise, null.
+ * @throws Exception
+ */
+ public static Map<String, Object> readGlobalConfigFromZookeeper(CuratorFramework client) throws Exception {
+ Map<String, Object> config = null;
+
+ Optional<byte[]> bytes = readFromZookeeperSafely(GLOBAL.getZookeeperRoot(), client);
+ if(bytes.isPresent()) {
+ InputStream in = new ByteArrayInputStream(bytes.get());
+ config = JSONUtils.INSTANCE.load(in, JSONUtils.MAP_SUPPLIER);
+ }
+
+ return config;
+ }
+
+ /**
+ * Reads the Indexing configuration from Zookeeper.
+ *
+ * @param sensorType The type of sensor.
+ * @param client The Zookeeper client.
+ * @return The indexing configuration for the given sensor type, if one exists. Otherwise, null.
+ * @throws Exception
+ */
+ public static Map<String, Object> readSensorIndexingConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+ Map<String, Object> config = null;
+
+ Optional<byte[]> bytes = readFromZookeeperSafely(INDEXING.getZookeeperRoot() + "/" + sensorType, client);
+ if(bytes.isPresent()) {
+ InputStream in = new ByteArrayInputStream(bytes.get());
+ config = JSONUtils.INSTANCE.load(in, JSONUtils.MAP_SUPPLIER);
+ }
+
+ return config;
+ }
+
+ /**
+ * Reads the Enrichment configuration from Zookeeper.
+ *
+ * @param sensorType The type of sensor.
+ * @param client The Zookeeper client.
+ * @return The Enrichment configuration for the given sensor type, if one exists. Otherwise, null.
+ * @throws Exception
+ */
public static SensorEnrichmentConfig readSensorEnrichmentConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
- return JSONUtils.INSTANCE.load(new ByteArrayInputStream(readFromZookeeper(ENRICHMENT.getZookeeperRoot() + "/" + sensorType, client)), SensorEnrichmentConfig.class);
+ SensorEnrichmentConfig config = null;
+
+ Optional<byte[]> bytes = readFromZookeeperSafely(ENRICHMENT.getZookeeperRoot() + "/" + sensorType, client);
+ if (bytes.isPresent()) {
+ config = SensorEnrichmentConfig.fromBytes(bytes.get());
+ }
+
+ return config;
}
+ /**
+ * Reads the Parser configuration from Zookeeper.
+ *
+ * @param sensorType The type of sensor.
+ * @param client The Zookeeper client.
+ * @return The Parser configuration for the given sensor type, if one exists. Otherwise, null.
+ * @throws Exception
+ */
public static SensorParserConfig readSensorParserConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
- return JSONUtils.INSTANCE.load(new ByteArrayInputStream(readFromZookeeper(PARSER.getZookeeperRoot() + "/" + sensorType, client)), SensorParserConfig.class);
+ SensorParserConfig config = null;
+
+ Optional<byte[]> bytes = readFromZookeeperSafely(PARSER.getZookeeperRoot() + "/" + sensorType, client);
+ if(bytes.isPresent()) {
+ config = SensorParserConfig.fromBytes(bytes.get());
+ }
+
+ return config;
+ }
+
+ /**
+ * Reads the Profiler configuration from Zookeeper.
+ *
+ * @param client The Zookeeper client.
+ * @return THe Profiler configuration.
+ * @throws Exception
+ */
+ public static ProfilerConfig readProfilerConfigFromZookeeper(CuratorFramework client) throws Exception {
+ ProfilerConfig config = null;
+
+ Optional<byte[]> bytes = readFromZookeeperSafely(PROFILER.getZookeeperRoot(), client);
+ if(bytes.isPresent()) {
+ config = ProfilerConfig.fromBytes(bytes.get());
+ }
+
+ return config;
}
public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
@@ -289,6 +378,36 @@ public class ConfigurationsUtils {
}
}
+ /**
+ * Read raw bytes from Zookeeper.
+ *
+ * @param path The path to the Zookeeper node to read.
+ * @param client The Zookeeper client.
+ * @return The bytes read from Zookeeper, if node exists. Otherwise, null.
+ * @throws Exception
+ */
+ public static Optional<byte[]> readFromZookeeperSafely(String path, CuratorFramework client) throws Exception {
+ Optional<byte[]> result = Optional.empty();
+
+ try {
+ byte[] bytes = readFromZookeeper(path, client);
+ result = Optional.of(bytes);
+
+ } catch(KeeperException.NoNodeException e) {
+ LOG.debug("Zookeeper node missing; path={}", e);
+ }
+
+ return result;
+ }
+
+ /**
+ * Read raw bytes from Zookeeper.
+ *
+ * @param path The path to the Zookeeper node to read.
+ * @param client The Zookeeper client.
+ * @return The bytes read from Zookeeper.
+ * @throws Exception If the path does not exist in Zookeeper.
+ */
public static byte[] readFromZookeeper(String path, CuratorFramework client) throws Exception {
if (client != null && client.getData() != null && path != null) {
return client.getData().forPath(path);
http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
index af90e14..5a1281c 100644
--- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
+++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
@@ -18,26 +18,17 @@
package org.apache.metron.management;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import java.lang.invoke.MethodHandles;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
-import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.ConfigurationType;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.ParseException;
@@ -46,203 +37,280 @@ import org.apache.metron.stellar.dsl.StellarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static org.apache.metron.common.configuration.ConfigurationType.ENRICHMENT;
+import static org.apache.metron.common.configuration.ConfigurationType.GLOBAL;
+import static org.apache.metron.common.configuration.ConfigurationType.INDEXING;
+import static org.apache.metron.common.configuration.ConfigurationType.PARSER;
+import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigBytesFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigBytesFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorEnrichmentConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorIndexingConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorParserConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeGlobalConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeSensorIndexingConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeSensorParserConfigToZookeeper;
+
+/**
+ * Defines functions that enable modification of Metron configuration values.
+ */
public class ConfigurationFunctions {
+
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static EnumMap<ConfigurationType, Object> configMap = new EnumMap<ConfigurationType, Object>(ConfigurationType.class) {{
- for(ConfigurationType ct : ConfigurationType.values()) {
- put(ct, Collections.synchronizedMap(new HashMap<String, String>()));
- }
- put(ConfigurationType.GLOBAL, "");
- put(ConfigurationType.PROFILER, "");
- }};
- private static synchronized void setupTreeCache(Context context) throws Exception {
- try {
- Optional<Object> treeCacheOpt = context.getCapability("treeCache");
- if (treeCacheOpt.isPresent()) {
- return;
- }
+
+
+ /**
+ * Retrieves the Zookeeper client from the execution context.
+ *
+ * @param context The execution context.
+ * @return A Zookeeper client, if one exists. Otherwise, an exception is thrown.
+ */
+ private static CuratorFramework getZookeeperClient(Context context) {
+
+ Optional<Object> clientOpt = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT, true);
+ if(clientOpt.isPresent()) {
+ return (CuratorFramework) clientOpt.get();
+
+ } else {
+ throw new IllegalStateException("Missing ZOOKEEPER_CLIENT; zookeeper connection required");
}
- catch(IllegalStateException ex) {
+ }
+ /**
+ * Get an argument from a list of arguments.
+ *
+ * @param index The index within the list of arguments.
+ * @param clazz The type expected.
+ * @param args All of the arguments.
+ * @param <T> The type of the argument expected.
+ */
+ public static <T> T getArg(int index, Class<T> clazz, List<Object> args) {
+
+ if(index >= args.size()) {
+ throw new IllegalArgumentException(format("expected at least %d argument(s), found %d", index+1, args.size()));
}
- Optional<Object> clientOpt = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT);
- if(!clientOpt.isPresent()) {
- throw new IllegalStateException("I expected a zookeeper client to exist and it did not. Please connect to zookeeper.");
+
+ return ConversionUtils.convert(args.get(index), clazz);
+ }
+
+ /**
+ * Serializes a configuration object to the raw JSON.
+ *
+ * @param object The configuration object to serialize
+ * @return
+ */
+ private static String toJSON(Object object) {
+
+ if(object == null) {
+ return null;
}
- CuratorFramework client = (CuratorFramework) clientOpt.get();
- TreeCache cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
- TreeCacheListener listener = new TreeCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
- if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
- String path = event.getData().getPath();
- byte[] data = event.getData().getData();
- String sensor = Iterables.getLast(Splitter.on("/").split(path), null);
- if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
- Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.PARSER);
- sensorMap.put(sensor, new String(data));
- } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
- configMap.put(ConfigurationType.GLOBAL, new String(data));
- } else if (ConfigurationType.PROFILER.getZookeeperRoot().equals(path)) {
- configMap.put(ConfigurationType.PROFILER, new String(data));
- } else if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
- Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.ENRICHMENT);
- sensorMap.put(sensor, new String(data));
- } else if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
- Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.INDEXING);
- sensorMap.put(sensor, new String(data));
- }
- }
- else if(event.getType().equals(TreeCacheEvent.Type.NODE_REMOVED)) {
- String path = event.getData().getPath();
- String sensor = Iterables.getLast(Splitter.on("/").split(path), null);
- if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
- Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.PARSER);
- sensorMap.remove(sensor);
- }
- else if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
- Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.ENRICHMENT);
- sensorMap.remove(sensor);
- }
- else if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
- Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.INDEXING);
- sensorMap.remove(sensor);
- }
- else if (ConfigurationType.PROFILER.getZookeeperRoot().equals(path)) {
- configMap.put(ConfigurationType.PROFILER, null);
- }
- else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
- configMap.put(ConfigurationType.GLOBAL, null);
- }
- }
- }
- };
- cache.getListenable().addListener(listener);
- cache.start();
- for(ConfigurationType ct : ConfigurationType.values()) {
- switch(ct) {
- case GLOBAL:
- case PROFILER:
- {
- String data = "";
- try {
- byte[] bytes = ConfigurationsUtils.readFromZookeeper(ct.getZookeeperRoot(), client);
- data = new String(bytes);
- }
- catch(Exception ex) {
-
- }
- configMap.put(ct, data);
- }
- break;
- case INDEXING:
- case ENRICHMENT:
- case PARSER:
- {
- List<String> sensorTypes = client.getChildren().forPath(ct.getZookeeperRoot());
- Map<String, String> sensorMap = (Map<String, String>)configMap.get(ct);
- for(String sensorType : sensorTypes) {
- sensorMap.put(sensorType, new String(ConfigurationsUtils.readFromZookeeper(ct.getZookeeperRoot() + "/" + sensorType, client)));
- }
- }
- break;
- }
+
+ try {
+ return JSONUtils.INSTANCE.toJSON(object, true);
+
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
}
- context.addCapability("treeCache", () -> cache);
}
@Stellar(
- namespace = "CONFIG"
- ,name = "GET"
- ,description = "Retrieve a Metron configuration from zookeeper."
- ,params = {"type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER"
- , "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)"
- , "emptyIfNotPresent - If true, then return an empty, minimally viable config"
- }
- ,returns = "The String representation of the config in zookeeper"
- )
+ namespace = "CONFIG",
+ name = "GET",
+ description = "Retrieve a Metron configuration from zookeeper.",
+ params = {
+ "type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER",
+ "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)",
+ "emptyIfNotPresent - If true, then return an empty, minimally viable config"
+ },
+ returns = "The String representation of the config in zookeeper")
public static class ConfigGet implements StellarFunction {
- boolean initialized = false;
+
+ /**
+ * Whether the function has been initialized.
+ */
+ private boolean initialized = false;
+
+ /**
+ * The Zookeeper client.
+ */
+ private CuratorFramework zkClient;
+
@Override
public Object apply(List<Object> args, Context context) throws ParseException {
- ConfigurationType type = ConfigurationType.valueOf((String)args.get(0));
- boolean emptyIfNotPresent = true;
+ String result;
- switch(type) {
- case GLOBAL:
- case PROFILER:
- return configMap.get(type);
- case PARSER: {
- String sensor = (String) args.get(1);
- if(args.size() > 2) {
- emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class);
- }
- Map<String, String> sensorMap = (Map<String, String>) configMap.get(type);
- String ret = sensorMap.get(sensor);
- if (ret == null && emptyIfNotPresent ) {
- SensorParserConfig config = new SensorParserConfig();
- config.setSensorTopic(sensor);
- try {
- ret = JSONUtils.INSTANCE.toJSON(config, true);
- } catch (JsonProcessingException e) {
- LOG.error("Unable to serialize default object: {}", e.getMessage(), e);
- throw new ParseException("Unable to serialize default object: " + e.getMessage(), e);
- }
- }
- return ret;
- }
- case INDEXING: {
- String sensor = (String) args.get(1);
- if(args.size() > 2) {
- emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class);
- }
- Map<String, String> sensorMap = (Map<String, String>) configMap.get(type);
- String ret = sensorMap.get(sensor);
- if (ret == null && emptyIfNotPresent ) {
- Map<String, Object> config = new HashMap<>();
- try {
- ret = JSONUtils.INSTANCE.toJSON(config, true);
- IndexingConfigurations.setIndex(config, sensor);
- } catch (JsonProcessingException e) {
- LOG.error("Unable to serialize default object: {}", e.getMessage(), e);
- throw new ParseException("Unable to serialize default object: " + e.getMessage(), e);
- }
- }
- return ret;
- }
- case ENRICHMENT: {
- String sensor = (String) args.get(1);
- if(args.size() > 2) {
- emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class);
- }
- Map<String, String> sensorMap = (Map<String, String>) configMap.get(type);
- String ret = sensorMap.get(sensor);
- if (ret == null && emptyIfNotPresent ) {
- SensorEnrichmentConfig config = new SensorEnrichmentConfig();
- try {
- ret = JSONUtils.INSTANCE.toJSON(config, true);
- } catch (JsonProcessingException e) {
- LOG.error("Unable to serialize default object: {}", e.getMessage(), e);
- throw new ParseException("Unable to serialize default object: " + e.getMessage(), e);
- }
- }
- return ret;
+ // the configuration type to write
+ String arg0 = getArg(0, String.class, args);
+ ConfigurationType type = ConfigurationType.valueOf(arg0);
+
+ try {
+
+ if (GLOBAL == type) {
+ result = getGlobalConfig(args);
+
+ } else if (PROFILER == type) {
+ result = getProfilerConfig(args);
+
+ } else if (ENRICHMENT == type) {
+ result = getEnrichmentConfig(args);
+
+ } else if (INDEXING == type) {
+ result = getIndexingConfig(args);
+
+ } else if (PARSER == type) {
+ result = getParserConfig(args);
+
+ } else {
+ throw new IllegalArgumentException("Unexpected configuration type: " + type);
}
- default:
- throw new UnsupportedOperationException("Unable to support type " + type);
+
+ } catch(Exception e) {
+ throw new RuntimeException(e);
}
+
+ return result;
}
- @Override
- public void initialize(Context context) {
- try {
- setupTreeCache(context);
- } catch (Exception e) {
- LOG.error("Unable to initialize: {}", e.getMessage(), e);
+ /**
+ * Retrieves the Global configuration.
+ *
+ * @return The Global configuration.
+ * @throws Exception
+ */
+ private String getGlobalConfig(List<Object> args) throws Exception {
+
+ Map<String, Object> globals = readGlobalConfigFromZookeeper(zkClient);
+
+ // provide empty/default config if one is not present?
+ if(globals == null && emptyIfNotPresent(args)) {
+ globals = new HashMap<>();
}
- finally {
- initialized = true;
+
+ return toJSON(globals);
+ }
+
+ /**
+ * Retrieves the Parser configuration.
+ *
+ * @param args The function arguments.
+ * @return The Parser configuration.
+ * @throws Exception
+ */
+ private String getParserConfig(List<Object> args) throws Exception {
+
+ // retrieve the enrichment config for the given sensor
+ String sensor = getArg(1, String.class, args);
+ SensorParserConfig sensorConfig = readSensorParserConfigFromZookeeper(sensor, zkClient);
+
+ // provide empty/default config if one is not present?
+ if(sensorConfig == null && emptyIfNotPresent(args)) {
+ sensorConfig = new SensorParserConfig();
}
+
+ return toJSON(sensorConfig);
+ }
+
+ /**
+ * Retrieve the Enrichment configuration.
+ *
+ * @param args The function arguments.
+ * @return The Enrichment configuration as a JSON string.
+ * @throws Exception
+ */
+ private String getEnrichmentConfig(List<Object> args) throws Exception {
+
+ // retrieve the enrichment config for the given sensor
+ String sensor = getArg(1, String.class, args);
+ SensorEnrichmentConfig sensorConfig = readSensorEnrichmentConfigFromZookeeper(sensor, zkClient);
+
+ // provide empty/default config if one is not present?
+ if(sensorConfig == null && emptyIfNotPresent(args)) {
+ sensorConfig = new SensorEnrichmentConfig();
+ }
+
+ return toJSON(sensorConfig);
+ }
+
+ /**
+ * Retrieve the Indexing configuration.
+ *
+ * @param args The function arguments.
+ * @return The Indexing configuration as a JSON string.
+ * @throws Exception
+ */
+ private String getIndexingConfig(List<Object> args) throws Exception {
+
+ // retrieve the enrichment config for the given sensor
+ String sensor = getArg(1, String.class, args);
+ Map<String, Object> sensorConfig = readSensorIndexingConfigFromZookeeper(sensor, zkClient);
+
+ // provide empty/default config if one is not present?
+ if(sensorConfig == null && emptyIfNotPresent(args)) {
+ sensorConfig = Collections.emptyMap();
+ }
+
+ return toJSON(sensorConfig);
+ }
+
+ /**
+ * Retrieve the Profiler configuration.
+ *
+ * @param args The function arguments.
+ * @return The Profiler configuration as a JSON string.
+ * @throws Exception
+ */
+ private String getProfilerConfig(List<Object> args) throws Exception {
+
+ ProfilerConfig profilerConfig = readProfilerConfigFromZookeeper(zkClient);
+
+ // provide empty/default config if one is not present?
+ if(profilerConfig == null && emptyIfNotPresent(args)) {
+ profilerConfig = new ProfilerConfig();
+ }
+
+ return toJSON(profilerConfig);
+ }
+
+ /**
+ * Retrieves the 'emptyIfNotPresent' argument.
+ *
+ * <p>This determines whether a default configuration should be returned, if no
+ * configuration is not present. This defaults to true.
+ *
+ * @param args The function arguments.
+ * @return The 'emptyIfNotPresent' argument.
+ * @throws Exception
+ */
+ private boolean emptyIfNotPresent(List<Object> args) {
+
+ boolean emptyIfNotPresent = true;
+ int lastIndex = args.size() - 1;
+
+ // expect 'emptyIfNotPresent' to always be the last boolean arg
+ if(args.size() >= 2 && args.get(lastIndex) instanceof Boolean) {
+ emptyIfNotPresent = getArg(lastIndex, Boolean.class, args);
+ }
+
+ return emptyIfNotPresent;
+ }
+
+ @Override
+ public void initialize(Context context) {
+ zkClient = getZookeeperClient(context);
}
@Override
@@ -250,91 +318,69 @@ public class ConfigurationFunctions {
return initialized;
}
}
+
@Stellar(
- namespace = "CONFIG"
- ,name = "PUT"
- ,description = "Updates a Metron config to Zookeeper."
- ,params = {"type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER"
- ,"config - The config (a string in JSON form) to update"
- , "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)"
- }
- ,returns = "The String representation of the config in zookeeper"
- )
+ namespace = "CONFIG",
+ name = "PUT",
+ description = "Updates a Metron config to Zookeeper.",
+ params = {
+ "type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER",
+ "config - The config (a string in JSON form) to update",
+ "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)"
+ },
+ returns = "The String representation of the config in zookeeper")
public static class ConfigPut implements StellarFunction {
- private CuratorFramework client;
- private boolean initialized = false;
@Override
public Object apply(List<Object> args, Context context) throws ParseException {
- ConfigurationType type = ConfigurationType.valueOf((String)args.get(0));
- String config = (String)args.get(1);
- if(config == null) {
- return null;
- }
- try {
- switch (type) {
- case GLOBAL:
- ConfigurationsUtils.writeGlobalConfigToZookeeper(config.getBytes(), client);
- break;
- case PROFILER:
- ConfigurationsUtils.writeProfilerConfigToZookeeper(config.getBytes(), client);
- break;
- case ENRICHMENT:
- {
- String sensor = (String) args.get(2);
- if(sensor == null) {
- return null;
- }
- ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensor, config.getBytes(), client);
- }
- break;
- case INDEXING:
- {
- String sensor = (String) args.get(2);
- if(sensor == null) {
- return null;
- }
- ConfigurationsUtils.writeSensorIndexingConfigToZookeeper(sensor, config.getBytes(), client);
- }
- break;
- case PARSER:
- {
- String sensor = (String) args.get(2);
- if(sensor == null) {
- return null;
- }
- ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensor, config.getBytes(), client);
+
+ // the configuration type to write
+ String arg0 = getArg(0, String.class, args);
+ ConfigurationType type = ConfigurationType.valueOf(arg0);
+
+ // the configuration value to write
+ String value = getArg(1, String.class, args);
+ if(value != null) {
+
+ CuratorFramework client = getZookeeperClient(context);
+ try {
+
+ if(GLOBAL == type) {
+ writeGlobalConfigToZookeeper(value.getBytes(), client);
+
+ } else if(PROFILER == type) {
+ writeProfilerConfigToZookeeper(value.getBytes(), client);
+
+ } else if(ENRICHMENT == type) {
+ String sensor = getArg(2, String.class, args);
+ writeSensorEnrichmentConfigToZookeeper(sensor, value.getBytes(), client);
+
+ } else if(INDEXING == type) {
+ String sensor = getArg(2, String.class, args);
+ writeSensorIndexingConfigToZookeeper(sensor, value.getBytes(), client);
+
+ } else if (PARSER == type) {
+ String sensor = getArg(2, String.class, args);
+ writeSensorParserConfigToZookeeper(sensor, value.getBytes(), client);
}
- break;
+
+ } catch(Exception e) {
+ LOG.error("Unexpected exception: {}", e.getMessage(), e);
+ throw new ParseException(e.getMessage());
}
}
- catch(Exception ex) {
- LOG.error("Unable to put config: {}", ex.getMessage(), ex);
- throw new ParseException("Unable to put config: " + ex.getMessage(), ex);
- }
+
return null;
}
@Override
public void initialize(Context context) {
- Optional<Object> clientOpt = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT);
- if(!clientOpt.isPresent()) {
- throw new IllegalStateException("I expected a zookeeper client to exist and it did not. Please connect to zookeeper.");
- }
- client = (CuratorFramework) clientOpt.get();
- try {
- setupTreeCache(context);
- } catch (Exception e) {
- LOG.error("Unable to initialize: {}", e.getMessage(), e);
- }
- finally {
- initialized = true;
- }
+ // nothing to do
}
@Override
public boolean isInitialized() {
- return initialized;
+ return true;
}
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
index 1920031..67e2a9d 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
@@ -19,194 +19,393 @@ package org.apache.metron.management;
import com.google.common.collect.ImmutableMap;
import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.PosixParser;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.log4j.Level;
import org.apache.metron.common.cli.ConfigurationManager;
import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.ParseException;
import org.apache.metron.test.utils.UnitTestHelper;
-import org.json.simple.parser.JSONParser;
import org.json.simple.JSONObject;
-import org.junit.Assert;
+import org.json.simple.parser.JSONParser;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Map;
import static org.apache.metron.TestConstants.PARSER_CONFIGS_PATH;
import static org.apache.metron.TestConstants.SAMPLE_CONFIG_PATH;
+import static org.apache.metron.common.configuration.ConfigurationType.GLOBAL;
+import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper;
import static org.apache.metron.management.utils.FileUtils.slurp;
import static org.apache.metron.stellar.common.utils.StellarProcessorUtils.run;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+/**
+ * Tests the ConfigurationFunctions class.
+ */
public class ConfigurationFunctionsTest {
+
private static TestingServer testZkServer;
- private static CuratorFramework client;
private static String zookeeperUrl;
- private Context context = new Context.Builder()
- .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
- .build();
+ private static CuratorFramework client;
+ private static String goodGlobalConfig = slurp( SAMPLE_CONFIG_PATH+ "/global.json");
+ private static String goodTestEnrichmentConfig = slurp( SAMPLE_CONFIG_PATH + "/enrichments/test.json");
+ private static String goodBroParserConfig = slurp(PARSER_CONFIGS_PATH + "/parsers/bro.json");
+ private static String goodTestIndexingConfig = slurp( SAMPLE_CONFIG_PATH + "/indexing/test.json");
+
+ private Context context;
+ private JSONParser parser;
+
+ /**
+ * {
+ * "profiles" : [
+ * {
+ * "profile" : "counter",
+ * "foreach" : "ip_src_addr",
+ * "init" : { "counter" : 0 },
+ * "update" : { "counter" : "counter + 1" },
+ * "result" : "counter"
+ * }
+ * ],
+ * "timestampField" : "timestamp"
+ * }
+ */
+ @Multiline
+ private static String goodProfilerConfig;
+
@BeforeClass
- public static void setup() throws Exception {
+ public static void setupZookeeper() throws Exception {
+
+ // zookeeper server
testZkServer = new TestingServer(true);
zookeeperUrl = testZkServer.getConnectString();
+
+ // zookeeper client
client = ConfigurationsUtils.getClient(zookeeperUrl);
client.start();
+ }
- pushConfigs(SAMPLE_CONFIG_PATH);
- pushConfigs(PARSER_CONFIGS_PATH);
+ @Before
+ public void setup() throws Exception {
+ context = new Context.Builder()
+ .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+ .build();
+
+ parser = new JSONParser();
+ // push configs to zookeeper
+ pushConfigs(SAMPLE_CONFIG_PATH, zookeeperUrl);
+ pushConfigs(PARSER_CONFIGS_PATH, zookeeperUrl);
+ writeProfilerConfigToZookeeper(goodProfilerConfig.getBytes(), client);
}
- private static void pushConfigs(String inputPath) throws Exception {
- String[] args = new String[]{
- "-z", zookeeperUrl
- , "--mode", "PUSH"
- , "--input_dir", inputPath
- };
- ConfigurationManager manager = new ConfigurationManager();
- manager.run(ConfigurationManager.ConfigurationOptions.parse(new PosixParser(), args));
+ /**
+ * Deletes a path within Zookeeper.
+ *
+ * @param path The path within Zookeeper to delete.
+ * @throws Exception
+ */
+ private void deletePath(String path) throws Exception {
+ client.delete().forPath(path);
}
+ /**
+ * Transforms a String to a {@link JSONObject}.
+ *
+ * @param input The input String to transform
+ * @return A {@link JSONObject}.
+ * @throws org.json.simple.parser.ParseException
+ */
+ private JSONObject toJSONObject(String input) throws org.json.simple.parser.ParseException {
- static String goodBroParserConfig = slurp(PARSER_CONFIGS_PATH + "/parsers/bro.json");
+ if(input == null) {
+ return null;
+ }
+ return (JSONObject) parser.parse(input.trim());
+ }
/**
- {
- "sensorTopic" : "brop",
- "parserConfig" : { },
- "fieldTransformations" : [ ],
- "readMetadata":false,
- "mergeMetadata":false,
- "parserParallelism" : 1,
- "errorWriterParallelism" : 1,
- "spoutNumTasks" : 1,
- "stormConfig" : {},
- "errorWriterNumTasks":1,
- "spoutConfig":{},
- "parserNumTasks":1,
- "spoutParallelism":1
- }
+ * Push configuration values to Zookeeper.
+ *
+ * @param inputPath The local filesystem path to the configurations.
+ * @param zookeeperUrl The URL of Zookeeper.
+ * @throws Exception
*/
- @Multiline
- static String defaultBropParserConfig;
+ private static void pushConfigs(String inputPath, String zookeeperUrl) throws Exception {
+
+ String[] args = new String[] {
+ "-z", zookeeperUrl,
+ "--mode", "PUSH",
+ "--input_dir", inputPath
+ };
+ CommandLine cli = ConfigurationManager.ConfigurationOptions.parse(new PosixParser(), args);
+ ConfigurationManager manager = new ConfigurationManager();
+ manager.run(cli);
+ }
+ /**
+ * The CONFIG_GET function should be able to return the Parser configuration
+ * for a given sensor.
+ */
@Test
- public void testParserGetHappyPath() {
+ public void testGetParser() throws Exception {
+
+ String out = (String) run("CONFIG_GET('PARSER', 'bro')", context);
- Object out = run("CONFIG_GET('PARSER', 'bro')", new HashMap<>(), context);
- Assert.assertEquals(goodBroParserConfig, out);
+ SensorParserConfig actual = SensorParserConfig.fromBytes(out.getBytes());
+ SensorParserConfig expected = SensorParserConfig.fromBytes(goodBroParserConfig.getBytes());
+ assertEquals(expected, actual);
}
+ /**
+ * The CONFIG_GET function should NOT return any configuration when the
+ * Parser configuration for a given sensor is missing AND emptyIfNotPresent = false.
+ */
@Test
- public void testParserGetMissWithoutDefault() {
+ public void testGetParserMissWithoutDefault() {
- {
- Object out = run("CONFIG_GET('PARSER', 'brop', false)", new HashMap<>(), context);
- Assert.assertNull(out);
- }
+ // expect null because emptyIfNotPresent = false
+ Object out = run("CONFIG_GET('PARSER', 'sensor', false)", context);
+ assertNull(out);
}
+ /**
+ * The CONFIG_GET function should return a default configuration when none
+ * currently exists.
+ */
@Test
- public void testParserGetMissWithDefault() throws Exception {
- JSONObject expected = (JSONObject) new JSONParser().parse(defaultBropParserConfig);
+ public void testGetParserMissWithDefault() throws Exception {
+ SensorParserConfig expected = new SensorParserConfig();
{
- Object out = run("CONFIG_GET('PARSER', 'brop')", new HashMap<>(), context);
- JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim());
- Assert.assertEquals(expected, actual);
+ Object out = run("CONFIG_GET('PARSER', 'sensor')", context);
+ SensorParserConfig actual = SensorParserConfig.fromBytes(out.toString().getBytes());
+ assertEquals(expected, actual);
}
{
- Object out = run("CONFIG_GET('PARSER', 'brop', true)", new HashMap<>(), context);
- JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim());
- Assert.assertEquals(expected, actual);
+ Object out = run("CONFIG_GET('PARSER', 'sensor', true)", context);
+ SensorParserConfig actual = SensorParserConfig.fromBytes(out.toString().getBytes());
+ assertEquals(expected, actual);
}
}
- static String goodTestEnrichmentConfig = slurp( SAMPLE_CONFIG_PATH + "/enrichments/test.json");
+ /**
+ * The CONFIG_GET function should be able to return the Enrichment configuration
+ * for a given sensor.
+ */
+ @Test
+ public void testGetEnrichment() throws Exception {
+
+ String out = (String) run("CONFIG_GET('ENRICHMENT', 'test')", context);
+
+ SensorEnrichmentConfig actual = SensorEnrichmentConfig.fromBytes(out.getBytes());
+ SensorEnrichmentConfig expected = SensorEnrichmentConfig.fromBytes(goodTestEnrichmentConfig.getBytes());
+ assertEquals(expected, actual);
+ }
+
+ /**
+ * No default configuration should be provided in this case.
+ */
+ @Test
+ public void testGetEnrichmentMissWithoutDefault() {
+
+ // expect null because emptyIfNotPresent = false
+ Object out = run("CONFIG_GET('ENRICHMENT', 'sense', false)", context);
+ assertNull(out);
+ }
/**
+ * A default empty configuration should be provided, if one does not exist.
+ */
+ @Test
+ public void testGetEnrichmentMissWithDefault() throws Exception {
+
+ // expect an empty configuration to be returned
+ SensorEnrichmentConfig expected = new SensorEnrichmentConfig();
{
- "enrichment" : {
- "fieldMap" : { },
- "fieldToTypeMap" : { },
- "config" : { }
- },
- "threatIntel" : {
- "fieldMap" : { },
- "fieldToTypeMap" : { },
- "config" : { },
- "triageConfig" : {
- "riskLevelRules" : [ ],
- "aggregator" : "MAX",
- "aggregationConfig" : { }
- }
- },
- "configuration" : { }
+ String out = (String) run("CONFIG_GET('ENRICHMENT', 'missing-sensor')", context);
+ SensorEnrichmentConfig actual = SensorEnrichmentConfig.fromBytes(out.getBytes());
+ assertEquals(expected, actual);
+ }
+ {
+ String out = (String) run("CONFIG_GET('ENRICHMENT', 'missing-sensor', true)", context);
+ SensorEnrichmentConfig actual = SensorEnrichmentConfig.fromBytes(out.getBytes());
+ assertEquals(expected, actual);
}
+ }
+
+ /**
+ * The CONFIG_GET function should be able to return the Indexing configuration
+ * for a given sensor.
*/
- @Multiline
- static String defaultBropEnrichmentConfig;
+ @Test
+ public void testGetIndexing() throws Exception {
+ String out = (String) run("CONFIG_GET('INDEXING', 'test')", context);
+
+ Map<String, Object> actual = toJSONObject(out);
+ Map<String, Object> expected = toJSONObject(goodTestIndexingConfig);
+ assertEquals(expected, actual);
+ }
+ /**
+ * No default configuration should be provided in this case.
+ */
@Test
- public void testEnrichmentGetHappyPath() {
+ public void testGetIndexingMissWithoutDefault() {
- Object out = run("CONFIG_GET('ENRICHMENT', 'test')", new HashMap<>(), context);
- Assert.assertEquals(goodTestEnrichmentConfig, out.toString().trim());
+ // expect null because emptyIfNotPresent = false
+ Object out = run("CONFIG_GET('INDEXING', 'sense', false)", context);
+ assertNull(out);
}
+ /**
+ * A default empty configuration should be provided, if one does not exist.
+ */
@Test
- public void testEnrichmentGetMissWithoutDefault() {
+ public void testGetIndexingtMissWithDefault() throws Exception {
+ // expect an empty configuration to be returned
+ Map<String, Object> expected = Collections.emptyMap();
+ {
+ String out = (String) run("CONFIG_GET('INDEXING', 'missing-sensor')", context);
+ Map<String, Object> actual = toJSONObject(out);
+ assertEquals(expected, actual);
+ }
{
- Object out = run("CONFIG_GET('ENRICHMENT', 'brop', false)", new HashMap<>(), context);
- Assert.assertNull(out);
+ String out = (String) run("CONFIG_GET('INDEXING', 'missing-sensor', true)", context);
+ Map<String, Object> actual = toJSONObject(out);
+ assertEquals(expected, actual);
}
}
+ /**
+ * The CONFIG_GET function should be able to return the Profiler configuration.
+ */
+ @Test
+ public void testGetProfiler() throws Exception {
+
+ String out = (String) run("CONFIG_GET('PROFILER')", context);
+
+ ProfilerConfig actual = ProfilerConfig.fromBytes(out.getBytes());
+ ProfilerConfig expected = ProfilerConfig.fromBytes(goodProfilerConfig.getBytes());
+ assertEquals(expected, actual);
+ }
+
+ /**
+ * No default configuration should be provided in this case.
+ */
@Test
- public void testEnrichmentGetMissWithDefault() throws Exception {
- JSONObject expected = (JSONObject) new JSONParser().parse(defaultBropEnrichmentConfig);
+ public void testGetProfilerMissWithoutDefault() throws Exception {
+
+ deletePath(PROFILER.getZookeeperRoot());
+ // expect null because emptyIfNotPresent = false
+ String out = (String) run("CONFIG_GET('PROFILER', false)", context);
+ assertNull(out);
+ }
+
+ /**
+ * A default empty configuration should be provided, if one does not exist.
+ */
+ @Test
+ public void testGetProfilerMissWithDefault() throws Exception {
+
+ // there is no profiler config in zookeeper
+ deletePath(PROFILER.getZookeeperRoot());
+
+ // expect an empty configuration to be returned
+ ProfilerConfig expected = new ProfilerConfig();
{
- Object out = run("CONFIG_GET('ENRICHMENT', 'brop')", new HashMap<>(), context);
- JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim());
- Assert.assertEquals(expected, actual);
+ String out = (String) run("CONFIG_GET('PROFILER', true)", context);
+ ProfilerConfig actual = ProfilerConfig.fromJSON(out);
+ assertEquals(expected, actual);
}
{
- Object out = run("CONFIG_GET('ENRICHMENT', 'brop', true)", new HashMap<>(), context);
- JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim());
- Assert.assertEquals(expected, actual);
+ String out = (String) run("CONFIG_GET('PROFILER')", context);
+ ProfilerConfig actual = ProfilerConfig.fromJSON(out);
+ assertEquals(expected, actual);
}
}
- static String goodGlobalConfig = slurp( SAMPLE_CONFIG_PATH+ "/global.json");
+ @Test
+ public void testGetGlobal() throws Exception {
+
+ String out = (String) run("CONFIG_GET('GLOBAL')", context);
+
+ Map<String, Object> actual = toJSONObject(out);
+ Map<String, Object> expected = toJSONObject(goodGlobalConfig);
+ assertEquals(expected, actual);
+ }
+
+ /**
+ * No default configuration should be provided in this case.
+ */
+ @Test
+ public void testGetGlobalMissWithoutDefault() throws Exception {
+
+ // there is no global config in zookeeper
+ deletePath(GLOBAL.getZookeeperRoot());
+
+ // expect null because emptyIfNotPresent = false
+ Object out = run("CONFIG_GET('GLOBAL', false)", context);
+ assertNull(out);
+ }
+ /**
+ * A default empty configuration should be provided, if one does not exist.
+ */
@Test
- public void testGlobalGet() {
+ public void testGetGlobalMissWithDefault() throws Exception {
+
+ // there is no global config in zookeeper
+ deletePath(GLOBAL.getZookeeperRoot());
- Object out = run("CONFIG_GET('GLOBAL')", new HashMap<>(), context);
- Assert.assertEquals(goodGlobalConfig, out.toString().trim());
+ // expect an empty configuration to be returned
+ Map<String, Object> expected = Collections.emptyMap();
+ {
+ String out = (String) run("CONFIG_GET('GLOBAL')", context);
+ Map<String, Object> actual = toJSONObject(out);
+ assertEquals(expected, actual);
+ }
+ {
+ String out = (String) run("CONFIG_GET('GLOBAL', true)", context);
+ Map<String, Object> actual = toJSONObject(out);
+ assertEquals(expected, actual);
+ }
}
@Test
- public void testGlobalPut() {
+ public void testPutGlobal() throws Exception {
+
+ String out = (String) run("CONFIG_GET('GLOBAL')", context);
- Object out = run("CONFIG_GET('GLOBAL')", new HashMap<>(), context);
- Assert.assertEquals(goodGlobalConfig, out.toString().trim());
+ Map<String, Object> actual = toJSONObject(out);
+ Map<String, Object> expected = toJSONObject(goodGlobalConfig);
+ assertEquals(expected, actual);
}
@Test(expected=ParseException.class)
- public void testGlobalPutBad() {
+ public void testPutGlobalBad() {
{
UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
try {
- run("CONFIG_PUT('GLOBAL', 'foo bar')", new HashMap<>(), context);
+ run("CONFIG_PUT('GLOBAL', 'foo bar')", context);
} catch(ParseException e) {
UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.ERROR);
throw e;
@@ -215,23 +414,23 @@ public class ConfigurationFunctionsTest {
}
@Test
- public void testIndexingPut() throws InterruptedException {
- String brop= (String) run("CONFIG_GET('INDEXING', 'testIndexingPut')", new HashMap<>(), context);
+ public void testPutIndexing() throws InterruptedException {
+ String brop= (String) run("CONFIG_GET('INDEXING', 'testIndexingPut')", context);
run("CONFIG_PUT('INDEXING', config, 'testIndexingPut')", ImmutableMap.of("config", brop), context);
boolean foundMatch = false;
for(int i = 0;i < 10 && !foundMatch;++i) {
- String bropNew = (String) run("CONFIG_GET('INDEXING', 'testIndexingPut', false)", new HashMap<>(), context);
+ String bropNew = (String) run("CONFIG_GET('INDEXING', 'testIndexingPut', false)", context);
foundMatch = brop.equals(bropNew);
if(foundMatch) {
break;
}
Thread.sleep(2000);
}
- Assert.assertTrue(foundMatch);
+ assertTrue(foundMatch);
}
@Test(expected= ParseException.class)
- public void testIndexingPutBad() throws InterruptedException {
+ public void testPutIndexingBad() throws InterruptedException {
{
{
UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
@@ -246,23 +445,26 @@ public class ConfigurationFunctionsTest {
}
@Test
- public void testEnrichmentPut() throws InterruptedException {
- String brop= (String) run("CONFIG_GET('ENRICHMENT', 'testEnrichmentPut')", new HashMap<>(), context);
- run("CONFIG_PUT('ENRICHMENT', config, 'testEnrichmentPut')", ImmutableMap.of("config", brop), context);
+ public void testPutEnrichment() throws InterruptedException {
+ String config = (String) run("CONFIG_GET('ENRICHMENT', 'sensor')", context);
+ assertNotNull(config);
+
+ run("CONFIG_PUT('ENRICHMENT', config, 'sensor')", ImmutableMap.of("config", config), context);
+
boolean foundMatch = false;
for(int i = 0;i < 10 && !foundMatch;++i) {
- String bropNew = (String) run("CONFIG_GET('ENRICHMENT', 'testEnrichmentPut', false)", new HashMap<>(), context);
- foundMatch = brop.equals(bropNew);
+ String newConfig = (String) run("CONFIG_GET('ENRICHMENT', 'sensor', false)", context);
+ foundMatch = config.equals(newConfig);
if(foundMatch) {
break;
}
Thread.sleep(2000);
}
- Assert.assertTrue(foundMatch);
+ assertTrue(foundMatch);
}
@Test(expected= ParseException.class)
- public void testEnrichmentPutBad() throws InterruptedException {
+ public void testPutEnrichmentBad() throws InterruptedException {
{
{
UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
@@ -277,23 +479,23 @@ public class ConfigurationFunctionsTest {
}
@Test
- public void testParserPut() throws InterruptedException {
- String brop= (String) run("CONFIG_GET('PARSER', 'testParserPut')", new HashMap<>(), context);
+ public void testPutParser() throws InterruptedException {
+ String brop= (String) run("CONFIG_GET('PARSER', 'testParserPut')", context);
run("CONFIG_PUT('PARSER', config, 'testParserPut')", ImmutableMap.of("config", brop), context);
boolean foundMatch = false;
for(int i = 0;i < 10 && !foundMatch;++i) {
- String bropNew = (String) run("CONFIG_GET('PARSER', 'testParserPut', false)", new HashMap<>(), context);
+ String bropNew = (String) run("CONFIG_GET('PARSER', 'testParserPut', false)", context);
foundMatch = brop.equals(bropNew);
if(foundMatch) {
break;
}
Thread.sleep(2000);
}
- Assert.assertTrue(foundMatch);
+ assertTrue(foundMatch);
}
@Test(expected= ParseException.class)
- public void testParserPutBad() throws InterruptedException {
+ public void testPutParserBad() throws InterruptedException {
{
UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
try {
http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java
index 781a0cf..352ae2b 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java
@@ -52,7 +52,6 @@ import java.io.ByteArrayInputStream;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -342,15 +341,18 @@ public class DefaultStellarShellExecutor implements StellarShellExecutor {
* @param zkClient An optional Zookeeper client.
*/
private Context createContext(Properties properties, Optional<CuratorFramework> zkClient) throws Exception {
+
Context.Builder contextBuilder = new Context.Builder();
Map<String, Object> globals;
if (zkClient.isPresent()) {
+ LOG.debug("Zookeeper client present; fetching globals from Zookeeper.");
// fetch globals from zookeeper
globals = fetchGlobalConfig(zkClient.get());
contextBuilder.with(ZOOKEEPER_CLIENT, () -> zkClient.get());
} else {
+ LOG.debug("No Zookeeper client; initializing empty globals.");
// use empty globals to allow a user to '%define' their own
globals = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
index 5912657..d5f267e 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
@@ -18,17 +18,18 @@
package org.apache.metron.stellar.common.utils;
+import com.google.common.collect.ImmutableList;
+import org.apache.metron.stellar.common.StellarPredicateProcessor;
+import org.apache.metron.stellar.common.StellarProcessor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.DefaultVariableResolver;
import org.apache.metron.stellar.dsl.MapVariableResolver;
import org.apache.metron.stellar.dsl.StellarFunctions;
import org.apache.metron.stellar.dsl.VariableResolver;
-import com.google.common.collect.ImmutableList;
-import org.apache.metron.stellar.common.StellarPredicateProcessor;
-import org.apache.metron.stellar.common.StellarProcessor;
import org.junit.Assert;
import java.util.AbstractMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Spliterators;
@@ -39,39 +40,76 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
+/**
+ * Utilities for executing and validating Stellar expressions.
+ */
public class StellarProcessorUtils {
- /**
- * This utility class is intended for use while unit testing Stellar operators.
- * It is included in the "main" code so third-party operators will not need
- * a test dependency on Stellar's test-jar.
- *
- * This class ensures the basic contract of a stellar expression is adhered to:
- * 1. Validate works on the expression
- * 2. The output can be serialized and deserialized properly
- *
- * @param rule
- * @param variables
- * @param context
- * @return ret
- */
- public static Object run(String rule, Map<String, Object> variables, Context context) {
- StellarProcessor processor = new StellarProcessor();
- Assert.assertTrue(rule + " not valid.", processor.validate(rule, context));
- Object ret = processor.parse(rule, new DefaultVariableResolver(x -> variables.get(x),x-> variables.containsKey(x)), StellarFunctions.FUNCTION_RESOLVER(), context);
- byte[] raw = SerDeUtils.toBytes(ret);
- Object actual = SerDeUtils.fromBytes(raw, Object.class);
- Assert.assertEquals(ret, actual);
- return ret;
- }
+ /**
+ * Execute and validate a Stellar expression.
+ *
+ * <p>This is intended for use while unit testing Stellar expressions. This ensures that the expression
+ * validates successfully and produces a result that can be serialized correctly.
+ *
+ * @param expression The expression to execute.
+ * @param variables The variables to expose to the expression.
+ * @param context The execution context.
+ * @return The result of executing the expression.
+ */
+ public static Object run(String expression, Map<String, Object> variables, Context context) {
+
+ // validate the expression
+ StellarProcessor processor = new StellarProcessor();
+ Assert.assertTrue("Invalid expression; expr=" + expression,
+ processor.validate(expression, context));
+
+ // execute the expression
+ Object ret = processor.parse(
+ expression,
+ new DefaultVariableResolver(x -> variables.get(x), x -> variables.containsKey(x)),
+ StellarFunctions.FUNCTION_RESOLVER(),
+ context);
+
+ // ensure the result can be serialized/deserialized
+ byte[] raw = SerDeUtils.toBytes(ret);
+ Object actual = SerDeUtils.fromBytes(raw, Object.class);
+ Assert.assertEquals(ret, actual);
+
+ return ret;
+ }
+
+ /**
+ * Execute and validate a Stellar expression.
+ *
+ * <p>This is intended for use while unit testing Stellar expressions. This ensures that the expression
+ * validates successfully and produces a result that can be serialized correctly.
+ *
+ * @param expression The expression to execute.
+ * @param variables The variables to expose to the expression.
+ * @return The result of executing the expression.
+ */
+ public static Object run(String expression, Map<String, Object> variables) {
+ return run(expression, variables, Context.EMPTY_CONTEXT());
+ }
- public static Object run(String rule, Map<String, Object> variables) {
- return run(rule, variables, Context.EMPTY_CONTEXT());
+ /**
+ * Execute and validate a Stellar expression.
+ *
+ * <p>This is intended for use while unit testing Stellar expressions. This ensures that the expression
+ * validates successfully and produces a result that can be serialized correctly.
+ *
+ * @param expression The expression to execute.
+ * @param context The execution context.
+ * @return The result of executing the expression.
+ */
+ public static Object run(String expression, Context context) {
+ return run(expression, Collections.emptyMap(), context);
}
- public static void validate(String rule, Context context) {
+ public static void validate(String expression, Context context) {
StellarProcessor processor = new StellarProcessor();
- Assert.assertTrue(rule + " not valid.", processor.validate(rule, context));
+ Assert.assertTrue("Invalid expression; expr=" + expression,
+ processor.validate(expression, context));
}
public static void validate(String rule) {
@@ -101,19 +139,18 @@ public class StellarProcessorUtils {
}
public static void runWithArguments(String function, List<Object> arguments, Object expected) {
- Supplier<Stream<Map.Entry<String, Object>>> kvStream = () -> StreamSupport.stream(new XRange(arguments.size()), false)
- .map( i -> new AbstractMap.SimpleImmutableEntry<>("var" + i, arguments.get(i)));
+ Supplier<Stream<Map.Entry<String, Object>>> kvStream = () -> StreamSupport
+ .stream(new XRange(arguments.size()), false)
+ .map(i -> new AbstractMap.SimpleImmutableEntry<>("var" + i, arguments.get(i)));
- String args = kvStream.get().map( kv -> kv.getKey())
- .collect(Collectors.joining(","));
+ String args = kvStream.get().map(kv -> kv.getKey()).collect(Collectors.joining(","));
Map<String, Object> variables = kvStream.get().collect(Collectors.toMap(kv -> kv.getKey(), kv -> kv.getValue()));
- String stellarStatement = function + "(" + args + ")";
+ String stellarStatement = function + "(" + args + ")";
String reason = stellarStatement + " != " + expected + " with variables: " + variables;
- if(expected instanceof Double) {
- Assert.assertEquals(reason, (Double)expected, (Double)run(stellarStatement, variables), 1e-6);
- }
- else {
+ if (expected instanceof Double) {
+ Assert.assertEquals(reason, (Double) expected, (Double) run(stellarStatement, variables), 1e-6);
+ } else {
Assert.assertEquals(reason, expected, run(stellarStatement, variables));
}
}
@@ -135,10 +172,9 @@ public class StellarProcessorUtils {
@Override
public boolean tryAdvance(IntConsumer action) {
boolean isDone = i >= end;
- if(isDone) {
+ if (isDone) {
return false;
- }
- else {
+ } else {
action.accept(i);
i++;
return true;
@@ -148,25 +184,20 @@ public class StellarProcessorUtils {
/**
* {@inheritDoc}
*
- * @param action
- * to {@code IntConsumer} and passed to
- * {@link #tryAdvance(IntConsumer)}; otherwise
- * the action is adapted to an instance of {@code IntConsumer}, by
- * boxing the argument of {@code IntConsumer}, and then passed to
- * {@link #tryAdvance(IntConsumer)}.
+ * @param action to {@code IntConsumer} and passed to {@link #tryAdvance(IntConsumer)};
+ * otherwise the action is adapted to an instance of {@code IntConsumer}, by boxing the
+ * argument of {@code IntConsumer}, and then passed to {@link #tryAdvance(IntConsumer)}.
*/
@Override
public boolean tryAdvance(Consumer<? super Integer> action) {
boolean isDone = i >= end;
- if(isDone) {
+ if (isDone) {
return false;
- }
- else {
+ } else {
action.accept(i);
i++;
return true;
}
}
}
-
}