You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2016/04/26 16:46:25 UTC
[37/51] [partial] incubator-metron git commit: METRON-113 Project
Reorganization (merrimanr) closes apache/incubator-metron#88
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
new file mode 100644
index 0000000..8050418
--- /dev/null
+++ b/metron-platform/metron-common/pom.xml
@@ -0,0 +1,300 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-platform</artifactId>
+ <version>0.1BETA</version>
+ </parent>
+ <artifactId>metron-common</artifactId>
+ <name>metron-common</name>
+ <description>Components common to all enrichments</description>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <commons.config.version>1.10</commons.config.version>
+ </properties>
+ <repositories>
+ <repository>
+ <id>Metron-Kraken-Repo</id>
+ <name>Metron Kraken Repository</name>
+ <url>https://raw.github.com/opensoc/kraken/mvn-repo</url>
+ </repository>
+ </repositories>
+ <dependencies>
+ <dependency>
+ <groupId>com.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ <version>${global_opencsv_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>${global_json_simple_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>${global_kafka_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${global_metrics_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ <version>${global_metrics_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>${commons.config.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.krakenapps</groupId>
+ <artifactId>kraken-pcap</artifactId>
+ <version>${global_pcap_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-simple</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_guava_version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>json-schema-validator</artifactId>
+ <version>${global_json_schema_validator_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>2.7.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-core</artifactId>
+ <version>${global_flux_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <version>${global_storm_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>org.apache.curator</artifactId>
+ <groupId>curator-client</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>2.7.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${global_mockito_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>nl.jqno.equalsverifier</groupId>
+ <artifactId>equalsverifier</artifactId>
+ <version>2.0.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-test-utilities</artifactId>
+ <version>0.1BETA</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <reporting>
+ <plugins>
+ <!-- Normally, dependency report takes time, skip it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>2.7</version>
+
+ <configuration>
+ <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>emma-maven-plugin</artifactId>
+ <version>1.0-alpha-3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-pmd-plugin</artifactId>
+ <configuration>
+ <targetJdk>1.7</targetJdk>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.7</source>
+ <compilerArgument>-Xlint:unchecked</compilerArgument>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.4</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <artifactSet>
+ <excludes>
+ <exclude>*slf4j*</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.metron.guava</shadedPattern>
+ </relocation>
+ </relocations>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resource>.yaml</resource>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass></mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptor>src/main/assembly/assembly.xml</descriptor>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/assembly/assembly.xml b/metron-platform/metron-common/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..305f0a6
--- /dev/null
+++ b/metron-platform/metron-common/src/main/assembly/assembly.xml
@@ -0,0 +1,33 @@
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<assembly>
+ <id>archive</id>
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.basedir}/src/main/scripts</directory>
+ <outputDirectory>/scripts</outputDirectory>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ <excludes>
+ <exclude>**/*.formatted</exclude>
+ <exclude>**/*.filtered</exclude>
+ </excludes>
+ <fileMode>0755</fileMode>
+ <lineEnding>unix</lineEnding>
+ </fileSet>
+ </fileSets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
new file mode 100644
index 0000000..3418e9c
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common;
+
+public class Constants {
+
+ public static final String GLOBAL_CONFIG_NAME = "global";
+ public static final String SENSORS_CONFIG_NAME = "sensors";
+ public static final String ZOOKEEPER_ROOT = "/metron";
+ public static final String ZOOKEEPER_TOPOLOGY_ROOT = ZOOKEEPER_ROOT + "/topology";
+ public static final String ZOOKEEPER_GLOBAL_ROOT = ZOOKEEPER_TOPOLOGY_ROOT + "/" + GLOBAL_CONFIG_NAME;
+ public static final String ZOOKEEPER_SENSOR_ROOT = ZOOKEEPER_TOPOLOGY_ROOT + "/" + SENSORS_CONFIG_NAME;
+ public static final long DEFAULT_CONFIGURED_BOLT_TIMEOUT = 5000;
+ public static final String SENSOR_TYPE = "source.type";
+ public static final String ENRICHMENT_TOPIC = "enrichments";
+ public static final String ERROR_STREAM = "error";
+ public static final String SIMPLE_HBASE_ENRICHMENT = "hbaseEnrichment";
+ public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel";
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
new file mode 100644
index 0000000..aa654fb
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.cli.ConfigurationsUtils;
+
+import java.io.IOException;
+import java.util.Map;
+
+public abstract class ConfiguredBolt extends BaseRichBolt {
+
+ private static final Logger LOG = Logger.getLogger(ConfiguredBolt.class);
+
+ private String zookeeperUrl;
+
+ protected final Configurations configurations = new Configurations();
+ protected CuratorFramework client;
+ protected TreeCache cache;
+
+ public ConfiguredBolt(String zookeeperUrl) {
+ this.zookeeperUrl = zookeeperUrl;
+ }
+
+ public Configurations getConfigurations() {
+ return configurations;
+ }
+
+ public void setCuratorFramework(CuratorFramework client) {
+ this.client = client;
+ }
+
+ public void setTreeCache(TreeCache cache) {
+ this.cache = cache;
+ }
+
+ public void reloadCallback(String name, Configurations.Type type) {
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ try {
+ if (client == null) {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+ }
+ client.start();
+ if (cache == null) {
+ cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
+ TreeCacheListener listener = new TreeCacheListener() {
+ @Override
+ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+ if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
+ String path = event.getData().getPath();
+ byte[] data = event.getData().getData();
+ updateConfig(path, data);
+ }
+ }
+ };
+ cache.getListenable().addListener(listener);
+ try {
+ ConfigurationsUtils.updateConfigsFromZookeeper(configurations, client);
+ } catch (Exception e) {
+ LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
+ }
+ }
+ cache.start();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void updateConfig(String path, byte[] data) throws IOException {
+ if (data.length != 0) {
+ String name = path.substring(path.lastIndexOf("/") + 1);
+ Configurations.Type type;
+ if (path.startsWith(Constants.ZOOKEEPER_SENSOR_ROOT)) {
+ configurations.updateSensorEnrichmentConfig(name, data);
+ type = Configurations.Type.SENSOR;
+ } else if (Constants.ZOOKEEPER_GLOBAL_ROOT.equals(path)) {
+ configurations.updateGlobalConfig(data);
+ type = Configurations.Type.GLOBAL;
+ } else {
+ configurations.updateConfig(name, data);
+ type = Configurations.Type.OTHER;
+ }
+ reloadCallback(name, type);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ cache.close();
+ client.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java
new file mode 100644
index 0000000..27f4c2a
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.SensorEnrichmentConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConfigurationsUtils {
+
+ public static CuratorFramework getClient(String zookeeperUrl) {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+ }
+
+ public static void writeGlobalConfigToZookeeper(Map<String, Object> globalConfig, String zookeeperUrl) throws Exception {
+ writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), zookeeperUrl);
+ }
+
+ public static void writeGlobalConfigToZookeeper(byte[] globalConfig, String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ try {
+ writeGlobalConfigToZookeeper(globalConfig, client);
+ }
+ finally {
+ client.close();
+ }
+ }
+
+ public static void writeGlobalConfigToZookeeper(byte[] globalConfig, CuratorFramework client) throws Exception {
+ writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, globalConfig, client);
+ }
+
+ public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig, String zookeeperUrl) throws Exception {
+ writeSensorEnrichmentConfigToZookeeper(sensorType, JSONUtils.INSTANCE.toJSON(sensorEnrichmentConfig), zookeeperUrl);
+ }
+
+ public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ try {
+ writeSensorEnrichmentConfigToZookeeper(sensorType, configData, client);
+ }
+ finally {
+ client.close();
+ }
+ }
+
+ public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
+ writeToZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, configData, client);
+ }
+
+ public static void writeConfigToZookeeper(String name, Map<String, Object> config, String zookeeperUrl) throws Exception {
+ writeConfigToZookeeper(name, JSONUtils.INSTANCE.toJSON(config), zookeeperUrl);
+ }
+
+ public static void writeConfigToZookeeper(String name, byte[] config, String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ try {
+ writeToZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, config, client);
+ }
+ finally {
+ client.close();
+ }
+ }
+
+ public static void writeToZookeeper(String path, byte[] configData, CuratorFramework client) throws Exception {
+ try {
+ client.setData().forPath(path, configData);
+ } catch (KeeperException.NoNodeException e) {
+ client.create().creatingParentsIfNeeded().forPath(path, configData);
+ }
+ }
+
+ public static void updateConfigsFromZookeeper(Configurations configurations, CuratorFramework client) throws Exception {
+ configurations.updateGlobalConfig(readGlobalConfigBytesFromZookeeper(client));
+ List<String> sensorTypes = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
+ for(String sensorType: sensorTypes) {
+ configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
+ }
+ }
+
+ public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
+ return readFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
+ }
+
+ public static byte[] readSensorEnrichmentConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+ return readFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
+ }
+
+ public static byte[] readConfigBytesFromZookeeper(String name, CuratorFramework client) throws Exception {
+ return readFromZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, client);
+ }
+
+ public static byte[] readFromZookeeper(String path, CuratorFramework client) throws Exception {
+ return client.getData().forPath(path);
+ }
+
+ public static void uploadConfigsToZookeeper(String rootFilePath, String zookeeperUrl) throws Exception {
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), zookeeperUrl);
+ Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(rootFilePath);
+ for(String sensorType: sensorEnrichmentConfigs.keySet()) {
+ ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
+ }
+ }
+
+ public static byte[] readGlobalConfigFromFile(String rootFilePath) throws IOException {
+ return Files.readAllBytes(Paths.get(rootFilePath, Constants.GLOBAL_CONFIG_NAME + ".json"));
+ }
+
+ public static Map<String, byte[]> readSensorEnrichmentConfigsFromFile(String rootPath) throws IOException {
+ Map<String, byte[]> sensorEnrichmentConfigs = new HashMap<>();
+ for(File file: new File(rootPath, Constants.SENSORS_CONFIG_NAME).listFiles()) {
+ sensorEnrichmentConfigs.put(FilenameUtils.removeExtension(file.getName()), Files.readAllBytes(file.toPath()));
+ }
+ return sensorEnrichmentConfigs;
+ }
+
+ public static void dumpConfigs(String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ //Output global configs
+ {
+ System.out.println("Global config");
+ byte[] globalConfigData = client.getData().forPath(Constants.ZOOKEEPER_GLOBAL_ROOT);
+ System.out.println(new String(globalConfigData));
+ }
+ //Output sensor specific configs
+ {
+ List<String> children = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
+ for (String child : children) {
+ byte[] data = client.getData().forPath(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + child);
+ System.out.println("Config for source " + child);
+ System.out.println(new String(data));
+ System.out.println();
+ }
+ }
+ client.close();
+ }
+
+ public static void main(String[] args) {
+
+ Options options = new Options();
+ {
+ Option o = new Option("h", "help", false, "This screen");
+ o.setRequired(false);
+ options.addOption(o);
+ }
+ {
+ Option o = new Option("p", "config_files", true, "Path to the source config files. Must be named like \"$source\".json");
+ o.setArgName("DIR_NAME");
+ o.setRequired(false);
+ options.addOption(o);
+ }
+ {
+ Option o = new Option("z", "zk", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+ o.setArgName("ZK_QUORUM");
+ o.setRequired(true);
+ options.addOption(o);
+ }
+
+ try {
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = null;
+ try {
+ cmd = parser.parse(options, args);
+ } catch (ParseException pe) {
+ pe.printStackTrace();
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("ConfigurationsUtils", null, options, null, true);
+ System.exit(-1);
+ }
+ if (cmd.hasOption("h")) {
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("ConfigurationsUtils", null, options, null, true);
+ System.exit(0);
+ }
+
+ String zkQuorum = cmd.getOptionValue("z");
+ if (cmd.hasOption("p")) {
+ String sourcePath = cmd.getOptionValue("p");
+ uploadConfigsToZookeeper(sourcePath, zkQuorum);
+ }
+
+ ConfigurationsUtils.dumpConfigs(zkQuorum);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
new file mode 100644
index 0000000..1ccf47b
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.cli.ConfigurationsUtils;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+public class Configuration extends Configurations {
+
+ protected CuratorFramework curatorFramework = null;
+ private Path configFileRoot;
+
+ public Configuration(CuratorFramework curatorFramework){
+
+ this.curatorFramework = curatorFramework;
+
+ }
+
+
+ public Configuration(Path configFileRoot){
+
+ this.configFileRoot = configFileRoot;
+ }
+
+ public void update() throws Exception {
+
+ if( null != curatorFramework ) {
+
+ ConfigurationsUtils.updateConfigsFromZookeeper(this, this.curatorFramework);
+
+ } else {
+
+ updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(configFileRoot.toAbsolutePath().toString()));
+ Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(configFileRoot.toAbsolutePath().toString());
+ for(String sensorType: sensorEnrichmentConfigs.keySet()) {
+ updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
+ }
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
new file mode 100644
index 0000000..6aaa2b4
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class Configurations implements Serializable {
+
+ private static final Logger LOG = Logger.getLogger(Configurations.class);
+
+ public enum Type {
+ GLOBAL, SENSOR, OTHER
+ }
+
+ public static final String GLOBAL_CONFIG_NAME = "global";
+
+ private ConcurrentMap<String, Object> configurations = new ConcurrentHashMap<>();
+
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> getGlobalConfig() {
+ return (Map<String, Object>) configurations.get(GLOBAL_CONFIG_NAME);
+ }
+
+ public void updateGlobalConfig(byte[] data) throws IOException {
+ updateGlobalConfig(new ByteArrayInputStream(data));
+ }
+
+ public void updateGlobalConfig(InputStream io) throws IOException {
+ Map<String, Object> globalConfig = JSONUtils.INSTANCE.load(io, new TypeReference<Map<String, Object>>() {
+ });
+ updateGlobalConfig(globalConfig);
+ }
+
+ public void updateGlobalConfig(Map<String, Object> globalConfig) {
+ configurations.put(GLOBAL_CONFIG_NAME, globalConfig);
+ }
+
+ public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
+ return (SensorEnrichmentConfig) configurations.get(sensorType);
+ }
+
+ public void updateSensorEnrichmentConfig(String sensorType, byte[] data) throws IOException {
+ updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data));
+ }
+
+ public void updateSensorEnrichmentConfig(String sensorType, InputStream io) throws IOException {
+ SensorEnrichmentConfig sensorEnrichmentConfig = JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class);
+ updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig);
+ }
+
+ public void updateSensorEnrichmentConfig(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig) {
+ configurations.put(sensorType, sensorEnrichmentConfig);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> getConfig(String name) {
+ return (Map<String, Object>) configurations.get(name);
+ }
+
+ public void updateConfig(String name, byte[] data) throws IOException {
+ if (data == null) throw new IllegalStateException("config data cannot be null");
+ Map<String, Object> config = JSONUtils.INSTANCE.load(new ByteArrayInputStream(data), new TypeReference<Map<String, Object>>() {});
+ updateConfig(name, config);
+ }
+
+ public void updateConfig(String name, Map<String, Object> config) {
+ configurations.put(name, config);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Configurations that = (Configurations) o;
+ return configurations.equals(that.configurations);
+ }
+
+ @Override
+ public int hashCode() {
+ return configurations.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return configurations.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java
new file mode 100644
index 0000000..2ead81e
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration;
+
+import com.google.common.base.Joiner;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.cli.ConfigurationsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class EnrichmentConfig {
+ public static enum Type {
+ THREAT_INTEL
+ ,ENRICHMENT
+ }
+
+ protected static final Logger _LOG = LoggerFactory.getLogger(EnrichmentConfig.class);
+ public static class FieldList {
+ Type type;
+ Map<String, List<String>> fieldToEnrichmentTypes;
+
+ public Type getType() {
+ return type;
+ }
+
+ public void setType(Type type) {
+ this.type = type;
+ }
+
+ public Map<String, List<String>> getFieldToEnrichmentTypes() {
+ return fieldToEnrichmentTypes;
+ }
+
+ public void setFieldToEnrichmentTypes(Map<String, List<String>> fieldToEnrichmentTypes) {
+ this.fieldToEnrichmentTypes = fieldToEnrichmentTypes;
+ }
+ }
+ public String zkQuorum;
+ public Map<String, FieldList> sensorToFieldList;
+
+ public String getZkQuorum() {
+ return zkQuorum;
+ }
+
+ public void setZkQuorum(String zkQuorum) {
+ this.zkQuorum = zkQuorum;
+ }
+
+ public Map<String, FieldList> getSensorToFieldList() {
+ return sensorToFieldList;
+ }
+
+ public void setSensorToFieldList(Map<String, FieldList> sensorToFieldList) {
+ this.sensorToFieldList = sensorToFieldList;
+ }
+
+ public void updateSensorConfigs( ) throws Exception {
+ CuratorFramework client = ConfigurationsUtils.getClient(getZkQuorum());
+ try {
+ client.start();
+ updateSensorConfigs(new ZKSourceConfigHandler(client), sensorToFieldList);
+ }
+ finally {
+ client.close();
+ }
+ }
+
+ public static interface SourceConfigHandler {
+ SensorEnrichmentConfig readConfig(String sensor) throws Exception;
+ void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception;
+ }
+
+ public static class ZKSourceConfigHandler implements SourceConfigHandler {
+ CuratorFramework client;
+ public ZKSourceConfigHandler(CuratorFramework client) {
+ this.client = client;
+ }
+ @Override
+ public SensorEnrichmentConfig readConfig(String sensor) throws Exception {
+ return SensorEnrichmentConfig.fromBytes(ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(sensor, client));
+ }
+
+ @Override
+ public void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception {
+ ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensor, config.toJSON().getBytes(), client);
+ }
+ }
+
+ public static void updateSensorConfigs( SourceConfigHandler scHandler
+ , Map<String, FieldList> sensorToFieldList
+ ) throws Exception
+ {
+ Map<String, SensorEnrichmentConfig> sourceConfigsChanged = new HashMap<>();
+ for (Map.Entry<String, FieldList> kv : sensorToFieldList.entrySet()) {
+ SensorEnrichmentConfig config = sourceConfigsChanged.get(kv.getKey());
+ if(config == null) {
+ config = scHandler.readConfig(kv.getKey());
+ if(_LOG.isDebugEnabled()) {
+ _LOG.debug(config.toJSON());
+ }
+ }
+ Map<String, List<String> > fieldMap = null;
+ Map<String, List<String> > fieldToTypeMap = null;
+ List<String> fieldList = null;
+ if(kv.getValue().type == Type.THREAT_INTEL) {
+ fieldMap = config.getThreatIntelFieldMap();
+ if(fieldMap!= null) {
+ fieldList = fieldMap.get(Constants.SIMPLE_HBASE_THREAT_INTEL);
+ }
+ if(fieldList == null) {
+ fieldList = new ArrayList<>();
+ fieldMap.put(Constants.SIMPLE_HBASE_THREAT_INTEL, fieldList);
+ }
+ fieldToTypeMap = config.getFieldToThreatIntelTypeMap();
+ if(fieldToTypeMap == null) {
+ fieldToTypeMap = new HashMap<>();
+ config.setFieldToThreatIntelTypeMap(fieldToTypeMap);
+ }
+ }
+ else if(kv.getValue().type == Type.ENRICHMENT) {
+ fieldMap = config.getEnrichmentFieldMap();
+ if(fieldMap!= null) {
+ fieldList = fieldMap.get(Constants.SIMPLE_HBASE_ENRICHMENT);
+ }
+ if(fieldList == null) {
+ fieldList = new ArrayList<>();
+ fieldMap.put(Constants.SIMPLE_HBASE_ENRICHMENT, fieldList);
+ }
+ fieldToTypeMap = config.getFieldToEnrichmentTypeMap();
+ if(fieldToTypeMap == null) {
+ fieldToTypeMap = new HashMap<>();
+ config.setFieldToEnrichmentTypeMap(fieldToTypeMap);
+ }
+ }
+ if(fieldToTypeMap == null || fieldMap == null) {
+ _LOG.debug("fieldToTypeMap is null or fieldMap is null, so skipping");
+ continue;
+ }
+ //Add the additional fields to the field list associated with the hbase adapter
+ {
+ HashSet<String> fieldSet = new HashSet<>(fieldList);
+ List<String> additionalFields = new ArrayList<>();
+ for (String field : kv.getValue().getFieldToEnrichmentTypes().keySet()) {
+ if (!fieldSet.contains(field)) {
+ additionalFields.add(field);
+ }
+ }
+ //adding only the ones that we don't already have to the field list
+ if (additionalFields.size() > 0) {
+ _LOG.debug("Adding additional fields: " + Joiner.on(',').join(additionalFields));
+ fieldList.addAll(additionalFields);
+ sourceConfigsChanged.put(kv.getKey(), config);
+ }
+ }
+ //Add the additional enrichment types to the mapping between the fields
+ {
+ for(Map.Entry<String, List<String>> fieldToType : kv.getValue().getFieldToEnrichmentTypes().entrySet()) {
+ String field = fieldToType.getKey();
+ final HashSet<String> types = new HashSet<>(fieldToType.getValue());
+ int sizeBefore = 0;
+ if(fieldToTypeMap.containsKey(field)) {
+ List<String> typeList = fieldToTypeMap.get(field);
+ sizeBefore = new HashSet<>(typeList).size();
+ types.addAll(typeList);
+ }
+ int sizeAfter = types.size();
+ boolean changed = sizeBefore != sizeAfter;
+ if(changed) {
+ fieldToTypeMap.put(field, new ArrayList<String>() {{
+ addAll(types);
+ }});
+ sourceConfigsChanged.put(kv.getKey(), config);
+ }
+ }
+ }
+ }
+ for(Map.Entry<String, SensorEnrichmentConfig> kv : sourceConfigsChanged.entrySet()) {
+ scHandler.persistConfig(kv.getKey(), kv.getValue());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java
new file mode 100644
index 0000000..bc30327
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SensorEnrichmentConfig {
+
+ private String index;
+ private Map<String, List<String>> enrichmentFieldMap;
+ private Map<String, List<String>> threatIntelFieldMap;
+ private Map<String, List<String>> fieldToEnrichmentTypeMap = new HashMap<>();
+ private Map<String, List<String>> fieldToThreatIntelTypeMap = new HashMap<>();
+ private int batchSize;
+
+ public String getIndex() {
+ return index;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ public Map<String, List<String>> getEnrichmentFieldMap() {
+ return enrichmentFieldMap;
+ }
+
+ public void setEnrichmentFieldMap(Map<String, List<String>> enrichmentFieldMap) {
+ this.enrichmentFieldMap = enrichmentFieldMap;
+ }
+
+ public Map<String, List<String>> getThreatIntelFieldMap() {
+ return threatIntelFieldMap;
+ }
+
+ public void setThreatIntelFieldMap(Map<String, List<String>> threatIntelFieldMap) {
+ this.threatIntelFieldMap = threatIntelFieldMap;
+ }
+
+ public Map<String, List<String>> getFieldToEnrichmentTypeMap() {
+ return fieldToEnrichmentTypeMap;
+ }
+
+ public Map<String, List<String>> getFieldToThreatIntelTypeMap() {
+ return fieldToThreatIntelTypeMap;
+ }
+ public void setFieldToEnrichmentTypeMap(Map<String, List<String>> fieldToEnrichmentTypeMap) {
+ this.fieldToEnrichmentTypeMap = fieldToEnrichmentTypeMap;
+ }
+
+ public void setFieldToThreatIntelTypeMap(Map<String, List<String>> fieldToThreatIntelTypeMap) {
+ this.fieldToThreatIntelTypeMap= fieldToThreatIntelTypeMap;
+ }
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public static SensorEnrichmentConfig fromBytes(byte[] config) throws IOException {
+ return JSONUtils.INSTANCE.load(new String(config), SensorEnrichmentConfig.class);
+ }
+
+ public String toJSON() throws JsonProcessingException {
+ return JSONUtils.INSTANCE.toJSON(this, true);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ SensorEnrichmentConfig that = (SensorEnrichmentConfig) o;
+
+ if (getBatchSize() != that.getBatchSize()) return false;
+ if (getIndex() != null ? !getIndex().equals(that.getIndex()) : that.getIndex() != null) return false;
+ if (getEnrichmentFieldMap() != null ? !getEnrichmentFieldMap().equals(that.getEnrichmentFieldMap()) : that.getEnrichmentFieldMap() != null)
+ return false;
+ if (getThreatIntelFieldMap() != null ? !getThreatIntelFieldMap().equals(that.getThreatIntelFieldMap()) : that.getThreatIntelFieldMap() != null)
+ return false;
+ if (getFieldToEnrichmentTypeMap() != null ? !getFieldToEnrichmentTypeMap().equals(that.getFieldToEnrichmentTypeMap()) : that.getFieldToEnrichmentTypeMap() != null)
+ return false;
+ return getFieldToThreatIntelTypeMap() != null ? getFieldToThreatIntelTypeMap().equals(that.getFieldToThreatIntelTypeMap()) : that.getFieldToThreatIntelTypeMap() == null;
+
+ }
+
+ @Override
+ public String toString() {
+ return "{index=" + index + ", batchSize=" + batchSize +
+ ", enrichmentFieldMap=" + enrichmentFieldMap +
+ ", threatIntelFieldMap" + threatIntelFieldMap +
+ ", fieldToEnrichmentTypeMap=" + fieldToEnrichmentTypeMap +
+ ", fieldToThreatIntelTypeMap=" + fieldToThreatIntelTypeMap + "}";
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getIndex() != null ? getIndex().hashCode() : 0;
+ result = 31 * result + (getEnrichmentFieldMap() != null ? getEnrichmentFieldMap().hashCode() : 0);
+ result = 31 * result + (getThreatIntelFieldMap() != null ? getThreatIntelFieldMap().hashCode() : 0);
+ result = 31 * result + (getFieldToEnrichmentTypeMap() != null ? getFieldToEnrichmentTypeMap().hashCode() : 0);
+ result = 31 * result + (getFieldToThreatIntelTypeMap() != null ? getFieldToThreatIntelTypeMap().hashCode() : 0);
+ result = 31 * result + getBatchSize();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
new file mode 100644
index 0000000..6fb3d78
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.interfaces;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.Configurations;
+
+import java.util.List;
+import java.util.Map;
+
+public interface BulkMessageWriter<T> extends AutoCloseable {
+
+ void init(Map stormConf, Configurations configuration) throws Exception;
+ void write(String sensorType, Configurations configurations, List<Tuple> tuples, List<T> messages) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
new file mode 100644
index 0000000..a90a8cb
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.interfaces;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.Configurations;
+
+public interface MessageWriter<T> extends AutoCloseable {
+
+ void init();
+ void write(String sensorType, Configurations configurations, Tuple tuple, T message) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
new file mode 100644
index 0000000..6e139c8
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Values;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.metron.common.Constants;
+import org.json.simple.JSONObject;
+
+public class ErrorUtils {
+
+ @SuppressWarnings("unchecked")
+ public static JSONObject generateErrorMessage(String message, Throwable t)
+ {
+ JSONObject error_message = new JSONObject();
+
+ /*
+ * Save full stack trace in object.
+ */
+ String stackTrace = ExceptionUtils.getStackTrace(t);
+
+ String exception = t.toString();
+
+ error_message.put("time", System.currentTimeMillis());
+ try {
+ error_message.put("hostname", InetAddress.getLocalHost().getHostName());
+ } catch (UnknownHostException ex) {
+ // TODO Auto-generated catch block
+ ex.printStackTrace();
+ }
+
+ error_message.put("message", message);
+ error_message.put(Constants.SENSOR_TYPE, "error");
+ error_message.put("exception", exception);
+ error_message.put("stack", stackTrace);
+
+ return error_message;
+ }
+
+ public static void handleError(OutputCollector collector, Throwable t, String errorStream) {
+ JSONObject error = ErrorUtils.generateErrorMessage(t.getMessage(), t);
+ collector.emit(errorStream, new Values(error));
+ collector.reportError(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
new file mode 100644
index 0000000..4af9ad1
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.*;
+
+public enum JSONUtils {
+ INSTANCE;
+ private static ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
+ /**
+ * Returns the current thread's "initial value" for this
+ * thread-local variable. This method will be invoked the first
+ * time a thread accesses the variable with the {@link #get}
+ * method, unless the thread previously invoked the {@link #set}
+ * method, in which case the {@code initialValue} method will not
+ * be invoked for the thread. Normally, this method is invoked at
+ * most once per thread, but it may be invoked again in case of
+ * subsequent invocations of {@link #remove} followed by {@link #get}.
+ * <p>
+ * <p>This implementation simply returns {@code null}; if the
+ * programmer desires thread-local variables to have an initial
+ * value other than {@code null}, {@code ThreadLocal} must be
+ * subclassed, and this method overridden. Typically, an
+ * anonymous inner class will be used.
+ *
+ * @return the initial value for this thread-local
+ */
+ @Override
+ protected ObjectMapper initialValue() {
+ return new ObjectMapper();
+ }
+ };
+
+ public <T> T load(InputStream is, TypeReference<T> ref) throws IOException {
+ return _mapper.get().readValue(is, ref);
+ }
+ public <T> T load(String is, TypeReference<T> ref) throws IOException {
+ return _mapper.get().readValue(is, ref);
+ }
+ public <T> T load(File f, TypeReference<T> ref) throws IOException {
+ return _mapper.get().readValue(new BufferedInputStream(new FileInputStream(f)), ref);
+ }
+ public <T> T load(InputStream is, Class<T> clazz) throws IOException {
+ return _mapper.get().readValue(is, clazz);
+ }
+
+ public <T> T load(File f, Class<T> clazz) throws IOException {
+ return _mapper.get().readValue(new BufferedInputStream(new FileInputStream(f)), clazz);
+ }
+ public <T> T load(String is, Class<T> clazz) throws IOException {
+ return _mapper.get().readValue(is, clazz);
+ }
+
+ public String toJSON(Object o, boolean pretty) throws JsonProcessingException {
+ if(pretty) {
+ return _mapper.get().writerWithDefaultPrettyPrinter().writeValueAsString(o);
+ }
+ else {
+ return _mapper.get().writeValueAsString(o);
+ }
+ }
+
+ public byte[] toJSON(Object config) throws JsonProcessingException {
+ return _mapper.get().writeValueAsBytes(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/MessageUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/MessageUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/MessageUtils.java
new file mode 100644
index 0000000..df711fa
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/MessageUtils.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import org.apache.metron.common.Constants;
+import org.json.simple.JSONObject;
+
+public class MessageUtils {
+
+ public static String getSensorType(JSONObject message) {
+ return (String) message.get(Constants.SENSOR_TYPE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
new file mode 100644
index 0000000..2afa097
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import java.lang.reflect.InvocationTargetException;
+
+public class ReflectionUtils<T> {
+
+ public static <T> T createInstance(String className, T defaultClass) {
+ T instance;
+ if(className == null || className.length() == 0 || className.charAt(0) == '$') {
+ return defaultClass;
+ }
+ else {
+ try {
+ Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
+ instance = clazz.getConstructor().newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("Unable to instantiate connector.", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Unable to instantiate connector: illegal access", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalStateException("Unable to instantiate connector", e);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalStateException("Unable to instantiate connector: no such method", e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to instantiate connector: class not found", e);
+ }
+ }
+ return instance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/scripts/zk_load_configs.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/scripts/zk_load_configs.sh b/metron-platform/metron-common/src/main/scripts/zk_load_configs.sh
new file mode 100755
index 0000000..4a928bd
--- /dev/null
+++ b/metron-platform/metron-common/src/main/scripts/zk_load_configs.sh
@@ -0,0 +1,33 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+ . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+ . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+export METRON_VERSION=0.1BETA
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export PARSERS_JAR=metron-parsers-$METRON_VERSION.jar
+export ZK_HOME=${ZK_HOME:-/usr/hdp/current/hbase-client}
+java -cp $METRON_HOME/lib/$PARSERS_JAR org.apache.metron.common.cli.ConfigurationsUtils "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredBoltTest.java
new file mode 100644
index 0000000..a791086
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredBoltTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.curator.test.TestingServer;
+import org.apache.metron.common.Constants;
+import org.apache.metron.TestConstants;
+import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.SensorEnrichmentConfig;
+import org.apache.metron.common.cli.ConfigurationsUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ConfiguredBoltTest extends BaseEnrichmentBoltTest {
+ private static Set<String> configsUpdated = new HashSet<>();
+ private Set<String> allConfigurationTypes = new HashSet<>();
+ private String zookeeperUrl;
+
+ public static class StandAloneConfiguredBolt extends ConfiguredBolt {
+
+ public StandAloneConfiguredBolt(String zookeeperUrl) {
+ super(zookeeperUrl);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ @Override
+ public void reloadCallback(String name, Configurations.Type type) {
+ configsUpdated.add(name);
+ }
+ }
+
+ @Before
+ public void setupConfiguration() throws Exception {
+ TestingServer testZkServer = new TestingServer(true);
+ this.zookeeperUrl = testZkServer.getConnectString();
+ byte[] globalConfig = ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, zookeeperUrl);
+ allConfigurationTypes.add(Constants.GLOBAL_CONFIG_NAME);
+ Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+ for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+ ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
+ allConfigurationTypes.add(sensorType);
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ Configurations sampleConfigurations = new Configurations();
+ try {
+ StandAloneConfiguredBolt configuredBolt = new StandAloneConfiguredBolt(null);
+ configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ Assert.fail("A valid zookeeper url must be supplied");
+ } catch (RuntimeException e){}
+
+ configsUpdated = new HashSet<>();
+ sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
+ Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+ for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+ sampleConfigurations.updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
+ }
+
+ StandAloneConfiguredBolt configuredBolt = new StandAloneConfiguredBolt(zookeeperUrl);
+ configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ waitForConfigUpdate(allConfigurationTypes);
+ Assert.assertEquals(sampleConfigurations, configuredBolt.configurations);
+
+ configsUpdated = new HashSet<>();
+ Map<String, Object> sampleGlobalConfig = sampleConfigurations.getGlobalConfig();
+ sampleGlobalConfig.put("newGlobalField", "newGlobalValue");
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
+ waitForConfigUpdate(Constants.GLOBAL_CONFIG_NAME);
+ Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.configurations.getGlobalConfig());
+
+ configsUpdated = new HashSet<>();
+ sampleGlobalConfig.remove("newGlobalField");
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
+ waitForConfigUpdate(Constants.GLOBAL_CONFIG_NAME);
+ Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.configurations);
+
+ configsUpdated = new HashSet<>();
+ String sensorType = "testSensorConfig";
+ SensorEnrichmentConfig testSensorConfig = new SensorEnrichmentConfig();
+ testSensorConfig.setBatchSize(50);
+ testSensorConfig.setIndex("test");
+ Map<String, List<String>> enrichmentFieldMap = new HashMap<>();
+ enrichmentFieldMap.put("enrichmentTest", new ArrayList<String>() {{
+ add("enrichmentField");
+ }});
+ testSensorConfig.setEnrichmentFieldMap(enrichmentFieldMap);
+ Map<String, List<String>> threatIntelFieldMap = new HashMap<>();
+ threatIntelFieldMap.put("threatIntelTest", new ArrayList<String>() {{
+ add("threatIntelField");
+ }});
+ testSensorConfig.setThreatIntelFieldMap(threatIntelFieldMap);
+ sampleConfigurations.updateSensorEnrichmentConfig(sensorType, testSensorConfig);
+ ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, testSensorConfig, zookeeperUrl);
+ waitForConfigUpdate(sensorType);
+ Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.configurations);
+
+ configsUpdated = new HashSet<>();
+ String someConfigType = "someConfig";
+ Map<String, Object> someConfig = new HashMap<>();
+ someConfig.put("someField", "someValue");
+ sampleConfigurations.updateConfig(someConfigType, someConfig);
+ ConfigurationsUtils.writeConfigToZookeeper(someConfigType, someConfig, zookeeperUrl);
+ waitForConfigUpdate(someConfigType);
+ Assert.assertEquals("Add new misc config", sampleConfigurations, configuredBolt.configurations);
+ configuredBolt.cleanup();
+ }
+
+ private void waitForConfigUpdate(final String expectedConfigUpdate) {
+ waitForConfigUpdate(new HashSet<String>() {{ add(expectedConfigUpdate); }});
+ }
+
+ private void waitForConfigUpdate(Set<String> expectedConfigUpdates) {
+ int count = 0;
+ while (!configsUpdated.equals(expectedConfigUpdates)) {
+ if (count++ > 5) {
+ Assert.fail("ConfiguredBolt was not updated in time");
+ return;
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/test/java/org/apache/metron/common/cli/ConfigurationsUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/cli/ConfigurationsUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/cli/ConfigurationsUtilsTest.java
new file mode 100644
index 0000000..bee4af7
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/cli/ConfigurationsUtilsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.cli;
+
+import junit.framework.Assert;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.cli.ConfigurationsUtils;
+import org.apache.metron.common.utils.JSONUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ConfigurationsUtilsTest {
+
+ private TestingServer testZkServer;
+ private String zookeeperUrl;
+ private CuratorFramework client;
+ private byte[] testGlobalConfig;
+ private Map<String, byte[]> testSensorConfigMap;
+
+ @Before
+ public void setup() throws Exception {
+ testZkServer = new TestingServer(true);
+ zookeeperUrl = testZkServer.getConnectString();
+ client = ConfigurationsUtils.getClient(zookeeperUrl);
+ client.start();
+ testGlobalConfig = ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+ testSensorConfigMap = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+ }
+
+ @Test
+ public void test() throws Exception {
+ Assert.assertTrue(testGlobalConfig.length > 0);
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(testGlobalConfig, zookeeperUrl);
+ byte[] readGlobalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client);
+ Assert.assertTrue(Arrays.equals(testGlobalConfig, readGlobalConfigBytes));
+
+ Assert.assertTrue(testSensorConfigMap.size() > 0);
+ String testSensorType = "yaf";
+ byte[] testSensorConfigBytes = testSensorConfigMap.get(testSensorType);
+ ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(testSensorType, testSensorConfigBytes, zookeeperUrl);
+ byte[] readSensorConfigBytes = ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(testSensorType, client);
+ Assert.assertTrue(Arrays.equals(testSensorConfigBytes, readSensorConfigBytes));
+ String name = "testConfig";
+ Map<String, Object> testConfig = new HashMap<>();
+ testConfig.put("stringField", "value");
+ testConfig.put("intField", 1);
+ testConfig.put("doubleField", 1.1);
+ ConfigurationsUtils.writeConfigToZookeeper(name, testConfig, zookeeperUrl);
+ byte[] readConfigBytes = ConfigurationsUtils.readConfigBytesFromZookeeper(name, client);
+ Assert.assertTrue(Arrays.equals(JSONUtils.INSTANCE.toJSON(testConfig), readConfigBytes));
+
+ }
+
+ @Test
+ public void testCmdLine() throws Exception {
+ String[] args = {"-z", zookeeperUrl, "-p", TestConstants.SAMPLE_CONFIG_PATH};
+ ConfigurationsUtils.main(args);
+ byte[] readGlobalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client);
+ Assert.assertTrue(Arrays.equals(testGlobalConfig, readGlobalConfigBytes));
+ for(String sensorType: testSensorConfigMap.keySet()) {
+ byte[] readSensorConfigBytes = ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client);
+ Assert.assertTrue(Arrays.equals(testSensorConfigMap.get(sensorType), readSensorConfigBytes));
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ client.close();
+ testZkServer.close();
+ testZkServer.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigurationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigurationTest.java
new file mode 100644
index 0000000..fb45ccc
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigurationTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ExistsBuilder;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.metron.common.Constants;
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConfigurationTest {
+
+ private static final String TEST_PROPERTY = "configuration.class.test.property";
+ private static final String TEST_VALUE = "Configuration";
+ @Test
+ public void testCanReadFromFile() throws Exception {
+
+ Configuration configuration = new Configuration(Paths.get("./src/test/resources/config/"));
+ configuration.update();
+
+ checkResult(configuration);
+
+ }
+
+ @Test
+ public void testCanReadFromZookeeper() throws Exception {
+
+ CuratorFramework curatorFramework = mock(CuratorFramework.class);
+ ExistsBuilder existsBuilder = mock(ExistsBuilder.class);
+ GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+ GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
+
+ when(getDataBuilder.forPath(Constants.ZOOKEEPER_GLOBAL_ROOT)).thenReturn(mockGlobalData());
+ when(curatorFramework.checkExists()).thenReturn(existsBuilder);
+ when(curatorFramework.getData()).thenReturn(getDataBuilder);
+ when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+ when(getChildrenBuilder.forPath(anyString())).thenReturn(Collections.<String> emptyList());
+
+ Configuration configuration = new Configuration(Paths.get("foo"));
+ configuration.curatorFramework = curatorFramework;
+ configuration.update();
+
+ checkResult(configuration);
+ }
+
+
+ private byte[] mockGlobalData(){
+
+ JSONObject global = new JSONObject();
+ global.put(TEST_PROPERTY, TEST_VALUE);
+ return global.toString().getBytes();
+
+ }
+
+
+ private void checkResult( Configuration configuration ){
+
+ assertEquals("File contains 1 entry: ",1,configuration.getGlobalConfig().size());
+ String testValue = configuration.getGlobalConfig().get(TEST_PROPERTY).toString();
+ assertEquals(TEST_PROPERTY + " should be \"" + TEST_VALUE + "\"",TEST_VALUE,testValue);
+
+
+ }
+}
+