You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/04/05 21:42:10 UTC
[14/15] incubator-metron git commit: METRON 86: Adding Solr indexing
support (merrimanr via cestella) closes apache/incubator-metron#67
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
index 30c8e23..a832ebb 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
@@ -23,17 +23,16 @@ import backtype.storm.topology.base.BaseRichBolt;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.log4j.Logger;
import org.apache.metron.Constants;
-import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.utils.ConfigurationsUtils;
import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
public abstract class ConfiguredBolt extends BaseRichBolt {
@@ -41,51 +40,62 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
private static final Logger LOG = Logger.getLogger(ConfiguredBolt.class);
private String zookeeperUrl;
+ private long timeout = Constants.DEFAULT_CONFIGURED_BOLT_TIMEOUT;
- protected Map<String, SourceConfig> configurations = Collections.synchronizedMap(new HashMap<String, SourceConfig>());
+ protected final Configurations configurations = new Configurations();
private CuratorFramework client;
- private PathChildrenCache cache;
+ private TreeCache cache;
public ConfiguredBolt(String zookeeperUrl) {
this.zookeeperUrl = zookeeperUrl;
}
+ public ConfiguredBolt withTimeout(long timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
client.start();
- cache = new PathChildrenCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT, true);
- PathChildrenCacheListener listener = new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
- byte[] data = event.getData().getData();
- if (data != null) {
- SourceConfig temp = SourceConfig.load(data);
- if (temp != null) {
- String[] path = event.getData().getPath().split("/");
- configurations.put(path[path.length - 1], temp);
- }
+ try {
+ ConfigurationsUtils.updateConfigsFromZookeeper(configurations, client);
+ cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
+ TreeCacheListener listener = new TreeCacheListener() {
+ @Override
+ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+ if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
+ String path = event.getData().getPath();
+ byte[] data = event.getData().getData();
+ updateConfig(path, data);
}
}
- }
- };
- cache.getListenable().addListener(listener);
- try {
+ };
+ cache.getListenable().addListener(listener);
cache.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ public void updateConfig(String path, byte[] data) throws IOException {
+ if (data.length != 0 && path != null) {
+ String name = path.substring(path.lastIndexOf("/") + 1);
+ if (path.startsWith(Constants.ZOOKEEPER_SENSOR_ROOT)) {
+ configurations.updateSensorEnrichmentConfig(name, data);
+ } else if (Constants.ZOOKEEPER_GLOBAL_ROOT.equals(path)) {
+ configurations.updateGlobalConfig(data);
+ } else {
+ configurations.updateConfig(name, data);
+ }
+ }
+ }
+
@Override
public void cleanup() {
- try {
- cache.close();
- client.close();
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
+ cache.close();
+ client.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
new file mode 100644
index 0000000..d93cc5f
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.log4j.Logger;
+import org.apache.metron.utils.JSONUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class Configurations implements Serializable {
+
+ private static final Logger LOG = Logger.getLogger(Configurations.class);
+
+ public static final String GLOBAL_CONFIG_NAME = "global";
+
+ private ConcurrentMap<String, Object> configurations = new ConcurrentHashMap<>();
+
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> getGlobalConfig() {
+ return (Map<String, Object>) configurations.get(GLOBAL_CONFIG_NAME);
+ }
+
+ public void updateGlobalConfig(byte[] data) throws IOException {
+ updateGlobalConfig(new ByteArrayInputStream(data));
+ }
+
+ public void updateGlobalConfig(InputStream io) throws IOException {
+ Map<String, Object> globalConfig = JSONUtils.INSTANCE.load(io, new TypeReference<Map<String, Object>>() {});
+ updateGlobalConfig(globalConfig);
+ }
+
+ public void updateGlobalConfig(Map<String, Object> globalConfig) {
+ configurations.put(GLOBAL_CONFIG_NAME, globalConfig);
+ }
+
+ public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
+ return (SensorEnrichmentConfig) configurations.get(sensorType);
+ }
+
+ public void updateSensorEnrichmentConfig(String sensorType, byte[] data) throws IOException {
+ updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data));
+ }
+
+ public void updateSensorEnrichmentConfig(String sensorType, InputStream io) throws IOException {
+ SensorEnrichmentConfig sensorEnrichmentConfig = JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class);
+ updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig);
+ }
+
+ public void updateSensorEnrichmentConfig(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig) {
+ configurations.put(sensorType, sensorEnrichmentConfig);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> getConfig(String name) {
+ return (Map<String, Object>) configurations.get(name);
+ }
+
+ public void updateConfig(String name, byte[] data) {
+ try {
+ Map<String, Object> config = JSONUtils.INSTANCE.load(new ByteArrayInputStream(data), new TypeReference<Map<String, Object>>(){});
+ updateConfig(name, config);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void updateConfig(String name, Map<String, Object> config) {
+ configurations.put(name, config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
new file mode 100644
index 0000000..b24e8a8
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import java.util.List;
+import java.util.Map;
+
+public class SensorEnrichmentConfig {
+
+ private String index;
+ private Map<String, List<String>> enrichmentFieldMap;
+ private Map<String, List<String>> threatIntelFieldMap;
+ private int batchSize;
+
+ public String getIndex() {
+ return index;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ public Map<String, List<String>> getEnrichmentFieldMap() {
+ return enrichmentFieldMap;
+ }
+
+ public void setEnrichmentFieldMap(Map<String, List<String>> enrichmentFieldMap) {
+ this.enrichmentFieldMap = enrichmentFieldMap;
+ }
+
+ public Map<String, List<String>> getThreatIntelFieldMap() {
+ return threatIntelFieldMap;
+ }
+
+ public void setThreatIntelFieldMap(Map<String, List<String>> threatIntelFieldMap) {
+ this.threatIntelFieldMap = threatIntelFieldMap;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java
deleted file mode 100644
index 8e1a960..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.domain;
-
-import org.codehaus.jackson.map.ObjectMapper;
-import org.yaml.snakeyaml.TypeDescription;
-import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.Constructor;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
-public class SourceConfig {
-
- final static ObjectMapper _mapper = new ObjectMapper();
-
- private String index;
- private Map<String, List<String>> enrichmentFieldMap;
- private Map<String, List<String>> threatIntelFieldMap;
- private int batchSize;
-
- public String getIndex() {
- return index;
- }
-
- public void setIndex(String index) {
- this.index = index;
- }
-
- public Map<String, List<String>> getEnrichmentFieldMap() {
- return enrichmentFieldMap;
- }
-
- public void setEnrichmentFieldMap(Map<String, List<String>> enrichmentFieldMap) {
- this.enrichmentFieldMap = enrichmentFieldMap;
- }
-
- public Map<String, List<String>> getThreatIntelFieldMap() {
- return threatIntelFieldMap;
- }
-
- public void setThreatIntelFieldMap(Map<String, List<String>> threatIntelFieldMap) {
- this.threatIntelFieldMap = threatIntelFieldMap;
- }
-
- public int getBatchSize() {
- return batchSize;
- }
-
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
-
- public static synchronized SourceConfig load(InputStream is) throws IOException {
- SourceConfig ret = _mapper.readValue(is, SourceConfig.class);
- return ret;
- }
-
- public static synchronized SourceConfig load(byte[] data) throws IOException {
- return load( new ByteArrayInputStream(data));
- }
-
- public static synchronized SourceConfig load(String s, Charset c) throws IOException {
- return load( s.getBytes(c));
- }
- public static synchronized SourceConfig load(String s) throws IOException {
- return load( s, Charset.defaultCharset());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
index b02cbaf..a1b7ccc 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
@@ -49,7 +49,7 @@ public class ErrorUtils {
}
error_message.put("message", message);
- error_message.put(Constants.SOURCE_TYPE, "error");
+ error_message.put(Constants.SENSOR_TYPE, "error");
error_message.put("exception", exception);
error_message.put("stack", stackTrace);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java
index a046801..10ab03d 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java
@@ -17,10 +17,13 @@
*/
package org.apache.metron.pcap;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import com.google.common.base.Joiner;
import org.apache.commons.lang.StringUtils;
import com.google.common.collect.BiMap;
@@ -210,6 +213,15 @@ public class PcapUtils {
}
+ public static String convertHexToIpv4Ip(String hex) {
+ List<Integer> ipSegments = new ArrayList<>();
+ for(int i = 0; i < hex.length(); i += 2) {
+ String segment = hex.substring(i, i + 2);
+ ipSegments.add(Integer.parseInt(segment, 16));
+ }
+ return Joiner.on(".").join(ipSegments);
+ }
+
/**
* Gets the session key.
*
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
index 581d74f..78371d8 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
@@ -22,7 +22,7 @@ import org.json.simple.JSONObject;
public class TopologyUtils {
- public static String getSourceType(JSONObject message) {
- return (String) message.get(Constants.SOURCE_TYPE);
+ public static String getSensorType(JSONObject message) {
+ return (String) message.get(Constants.SENSOR_TYPE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
new file mode 100644
index 0000000..20026b2
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.utils;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.Configurations;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConfigurationsUtils {
+
+ public static CuratorFramework getClient(String zookeeperUrl) {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+ }
+
+ public static void writeToZookeeperFromFile(String path, String filePath, String zookeeperUrl) throws Exception {
+ writeToZookeeper(path, Files.readAllBytes(Paths.get(filePath)), zookeeperUrl);
+ }
+
+ public static void writerGlobalConfigToZookeeper(byte[] configData, String zookeeperUrl) throws Exception {
+ writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, configData, zookeeperUrl);
+ }
+
+ public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
+ writeToZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, configData, zookeeperUrl);
+ }
+
+ public static void writeToZookeeper(String path, byte[] configData, String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ try {
+ client.setData().forPath(path, configData);
+ } catch (KeeperException.NoNodeException e) {
+ client.create().creatingParentsIfNeeded().forPath(path, configData);
+ }
+ client.close();
+ }
+
+ public static void updateConfigsFromZookeeper(Configurations configurations, CuratorFramework client) throws Exception {
+ configurations.updateGlobalConfig(readGlobalConfigBytesFromZookeeper(client));
+ List<String> sensorTypes = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
+ for(String sensorType: sensorTypes) {
+ configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
+ }
+ }
+
+ public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
+ return readConfigBytesFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
+ }
+
+ public static byte[] readSensorEnrichmentConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+ return readConfigBytesFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
+ }
+
+ public static byte[] readConfigBytesFromZookeeper(String path, CuratorFramework client) throws Exception {
+ return client.getData().forPath(path);
+ }
+
+ public static void uploadConfigsToZookeeper(String rootFilePath, String zookeeperUrl) throws Exception {
+ ConfigurationsUtils.writerGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), zookeeperUrl);
+ Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(rootFilePath);
+ for(String sensorType: sensorEnrichmentConfigs.keySet()) {
+ ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
+ }
+ }
+
+ public static byte[] readGlobalConfigFromFile(String rootFilePath) throws IOException {
+ return Files.readAllBytes(Paths.get(rootFilePath, Constants.GLOBAL_CONFIG_NAME + ".json"));
+ }
+
+ public static Map<String, byte[]> readSensorEnrichmentConfigsFromFile(String rootPath) throws IOException {
+ Map<String, byte[]> sensorEnrichmentConfigs = new HashMap<>();
+ for(File file: new File(rootPath, Constants.SENSORS_CONFIG_NAME).listFiles()) {
+ sensorEnrichmentConfigs.put(FilenameUtils.removeExtension(file.getName()), Files.readAllBytes(file.toPath()));
+ }
+ return sensorEnrichmentConfigs;
+ }
+
+ public static void dumpConfigs(String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ List<String> children = client.getChildren().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT);
+ for (String child : children) {
+ byte[] data = client.getData().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + child);
+ System.out.println("Config for source " + child);
+ System.out.println(new String(data));
+ System.out.println();
+ }
+ client.close();
+ }
+
+ public static void main(String[] args) {
+
+ Options options = new Options();
+ {
+ Option o = new Option("h", "help", false, "This screen");
+ o.setRequired(false);
+ options.addOption(o);
+ }
+ {
+ Option o = new Option("p", "config_files", true, "Path to the source config files. Must be named like \"$source\"-config.json");
+ o.setArgName("DIR_NAME");
+ o.setRequired(false);
+ options.addOption(o);
+ }
+ {
+ Option o = new Option("z", "zk", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+ o.setArgName("ZK_QUORUM");
+ o.setRequired(true);
+ options.addOption(o);
+ }
+
+ try {
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = null;
+ try {
+ cmd = parser.parse(options, args);
+ } catch (ParseException pe) {
+ pe.printStackTrace();
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("ConfigurationsUtils", null, options, null, true);
+ System.exit(-1);
+ }
+ if (cmd.hasOption("h")) {
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("ConfigurationsUtils", null, options, null, true);
+ System.exit(0);
+ }
+
+ String zkQuorum = cmd.getOptionValue("z");
+ if (cmd.hasOption("p")) {
+ String sourcePath = cmd.getOptionValue("p");
+ uploadConfigsToZookeeper(sourcePath, zkQuorum);
+ }
+
+ ConfigurationsUtils.dumpConfigs(zkQuorum);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
index 93b0a58..cffcd68 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
@@ -75,4 +75,8 @@ public enum JSONUtils {
return _mapper.get().writeValueAsString(o);
}
}
+
+ public byte[] toJSON(Object config) throws JsonProcessingException {
+ return _mapper.get().writeValueAsBytes(config);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
index b257b24..291b849 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.domain.Configurations;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.TableProvider;
import org.apache.metron.utils.ConfigUtils;
@@ -62,7 +62,7 @@ public abstract class HBaseWriter implements MessageWriter<JSONObject>, Serializ
}
@Override
- public void write(String sourceType, SourceConfig configuration, Tuple tuple, JSONObject message) throws Exception {
+ public void write(String sourceType, Configurations configurations, Tuple tuple, JSONObject message) throws Exception {
Put put = new Put(getKey(tuple, message));
Map<String, byte[]> values = getValues(tuple, message);
for(String column: values.keySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
index 9b627e6..c3a930c 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
@@ -18,14 +18,14 @@
package org.apache.metron.writer.interfaces;
import backtype.storm.tuple.Tuple;
-import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.domain.Configurations;
import java.util.List;
import java.util.Map;
public interface BulkMessageWriter<T> extends AutoCloseable {
- void init(Map stormConf);
- void write(String sourceType, SourceConfig configuration, List<Tuple> tuples, List<T> messages) throws Exception;
+ void init(Map stormConf, Configurations configuration) throws Exception;
+ void write(String sensorType, Configurations configurations, List<Tuple> tuples, List<T> messages) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
index 12de836..25c8a5a 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
@@ -18,10 +18,10 @@
package org.apache.metron.writer.interfaces;
import backtype.storm.tuple.Tuple;
-import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.domain.Configurations;
public interface MessageWriter<T> extends AutoCloseable {
void init();
- void write(String sourceType, SourceConfig configuration, Tuple tuple, T message) throws Exception;
+ void write(String sensorType, Configurations configurations, Tuple tuple, T message) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/test/java/org/apache/metron/pcap/PcapUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/pcap/PcapUtilsTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/pcap/PcapUtilsTest.java
new file mode 100644
index 0000000..fa2385c
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/pcap/PcapUtilsTest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+public class PcapUtilsTest {
+
+ @Test
+ public void testConvertHexToIpv4Ip() {
+ String hex = "c0a88a9e";
+ String ipAddress = PcapUtils.convertHexToIpv4Ip(hex);
+ Assert.assertEquals("192.168.138.158", ipAddress);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
deleted file mode 100644
index 34109b8..0000000
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
- "index": "bro",
- "batchSize": 5,
- "enrichmentFieldMap":
- {
- "geo": ["ip_dst_addr", "ip_src_addr"],
- "host": ["host"]
- },
- "threatIntelFieldMap":
- {
- "ip": ["ip_dst_addr", "ip_src_addr"]
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/test/resources/config/source/pcap-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/pcap-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/pcap-config.json
deleted file mode 100644
index 82c7c5e..0000000
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/pcap-config.json
+++ /dev/null
@@ -1,13 +0,0 @@
-{
- "index": "pcap",
- "batchSize": 5,
- "enrichmentFieldMap":
- {
- "geo": ["ip_src_addr", "ip_dst_addr"],
- "host": ["ip_src_addr", "ip_dst_addr"]
- },
- "threatIntelFieldMap":
- {
- "ip": ["ip_src_addr", "ip_dst_addr"]
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
deleted file mode 100644
index 1208637..0000000
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
- "index": "snort",
- "batchSize": 1,
- "enrichmentFieldMap":
- {
- "geo": ["ip_dst_addr", "ip_src_addr"],
- "host": ["host"]
- },
- "threatIntelFieldMap":
- {
- "ip": ["ip_dst_addr", "ip_src_addr"]
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
deleted file mode 100644
index 65de961..0000000
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
- "index": "yaf",
- "batchSize": 5,
- "enrichmentFieldMap":
- {
- "geo": ["ip_dst_addr", "ip_src_addr"],
- "host": ["host"]
- },
- "threatIntelFieldMap":
- {
- "ip": ["ip_dst_addr", "ip_src_addr"]
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/pom.xml b/metron-streaming/Metron-Elasticsearch/pom.xml
new file mode 100644
index 0000000..ab9242a
--- /dev/null
+++ b/metron-streaming/Metron-Elasticsearch/pom.xml
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Streaming</artifactId>
+ <version>0.1BETA</version>
+ </parent>
+ <artifactId>Metron-Elasticsearch</artifactId>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${global_elasticsearch_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Testing</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Topologies</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${global_mockito_version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+ <reporting>
+ <plugins>
+ <!-- Normally, dependency report takes time, skip it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>2.7</version>
+
+ <configuration>
+ <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>emma-maven-plugin</artifactId>
+ <version>1.0-alpha-3</version>
+ <inherited>true</inherited>
+ </plugin>
+ </plugins>
+ </reporting>
+
+ <build>
+ <plugins>
+ <plugin>
+ <!-- Separates the unit tests from the integration tests. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.12.4</version>
+ <configuration>
+ <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
+ <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
+ <skip>true</skip>
+ <!-- Show 100% of the lines from the stack trace (doesn't work) -->
+ <trimStackTrace>false</trimStackTrace>
+ </configuration>
+ <executions>
+ <execution>
+ <id>unit-tests</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <!-- Never skip running the tests when the test phase is invoked -->
+ <skip>false</skip>
+ <includes>
+ <!-- Include unit tests within integration-test phase. -->
+ <include>**/*Test.java</include>
+ </includes>
+ <excludes>
+ <!-- Exclude integration tests within (unit) test phase. -->
+ <exclude>**/*IntegrationTest.java</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ <execution>
+ <id>integration-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <!-- Never skip running the tests when the integration-test phase is invoked -->
+ <skip>false</skip>
+ <includes>
+ <!-- Include integration tests within integration-test phase. -->
+ <include>**/*IntegrationTest.java</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>storm:storm-core:*</exclude>
+ <exclude>storm:storm-lib:*</exclude>
+ <exclude>org.slf4j.impl*</exclude>
+ <exclude>org.slf4j:slf4j-log4j*</exclude>
+ </excludes>
+ </artifactSet>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resource>.yaml</resource>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass></mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptor>src/main/assembly/assembly.xml</descriptor>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Elasticsearch/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/main/assembly/assembly.xml b/metron-streaming/Metron-Elasticsearch/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..35cbcc3
--- /dev/null
+++ b/metron-streaming/Metron-Elasticsearch/src/main/assembly/assembly.xml
@@ -0,0 +1,41 @@
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<assembly>
+ <id>archive</id>
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.basedir}/src/main/resources/Metron_Configs/etc</directory>
+ <outputDirectory>/config/etc</outputDirectory>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ <excludes>
+ <exclude>**/*.formatted</exclude>
+ <exclude>**/*.filtered</exclude>
+ </excludes>
+ <fileMode>0644</fileMode>
+ <lineEnding>unix</lineEnding>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/target</directory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ <outputDirectory>/lib</outputDirectory>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ </fileSet>
+ </fileSets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
new file mode 100644
index 0000000..45631f2
--- /dev/null
+++ b/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.domain.SensorEnrichmentConfig;
+import org.apache.metron.writer.interfaces.BulkMessageWriter;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable {
+
+ private Map<String, String> optionalSettings;
+ private transient TransportClient client;
+ private SimpleDateFormat dateFormat;
+ private static final Logger LOG = LoggerFactory
+ .getLogger(ElasticsearchWriter.class);
+
+ public ElasticsearchWriter withOptionalSettings(Map<String, String> optionalSettings) {
+ this.optionalSettings = optionalSettings;
+ return this;
+ }
+
+ @Override
+ public void init(Map stormConf, Configurations configurations) {
+ Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
+ ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
+ builder.put("cluster.name", globalConfiguration.get("es.clustername"));
+ builder.put("client.transport.ping_timeout","500s");
+ if (optionalSettings != null) {
+ builder.put(optionalSettings);
+ }
+ client = new TransportClient(builder.build())
+ .addTransportAddress(new InetSocketTransportAddress((String) globalConfiguration.get("es.ip"), (Integer) globalConfiguration.get("es.port")));
+ dateFormat = new SimpleDateFormat((String) globalConfiguration.get("es.date.format"));
+
+ }
+
+ @Override
+ public void write(String sensorType, Configurations configurations, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
+ SensorEnrichmentConfig sensorEnrichmentConfig = configurations.getSensorEnrichmentConfig(sensorType);
+ String indexPostfix = dateFormat.format(new Date());
+ BulkRequestBuilder bulkRequest = client.prepareBulk();
+ for(JSONObject message: messages) {
+ String indexName = sensorType;
+ if (sensorEnrichmentConfig != null) {
+ indexName = sensorEnrichmentConfig.getIndex();
+ }
+ IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName + "_index_" + indexPostfix,
+ sensorType);
+
+ indexRequestBuilder.setSource(message.toJSONString());
+ bulkRequest.add(indexRequestBuilder);
+ }
+ BulkResponse resp = bulkRequest.execute().actionGet();
+ if (resp.hasFailures()) {
+ throw new Exception(resp.buildFailureMessage());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ client.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Elasticsearch/src/main/resources/Metron_Configs/etc/env/elasticsearch.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/main/resources/Metron_Configs/etc/env/elasticsearch.properties b/metron-streaming/Metron-Elasticsearch/src/main/resources/Metron_Configs/etc/env/elasticsearch.properties
new file mode 100644
index 0000000..1381b49
--- /dev/null
+++ b/metron-streaming/Metron-Elasticsearch/src/main/resources/Metron_Configs/etc/env/elasticsearch.properties
@@ -0,0 +1,109 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+##### Kafka #####
+
+kafka.zk=node1:2181
+kafka.broker=node1:6667
+spout.kafka.topic.asa=asa
+spout.kafka.topic.bro=bro
+spout.kafka.topic.fireeye=fireeye
+spout.kafka.topic.ise=ise
+spout.kafka.topic.lancope=lancope
+spout.kafka.topic.paloalto=paloalto
+spout.kafka.topic.pcap=pcap
+spout.kafka.topic.snort=snort
+spout.kafka.topic.yaf=yaf
+
+##### Indexing #####
+writer.class.name=org.apache.metron.writer.ElasticsearchWriter
+
+##### ElasticSearch #####
+
+es.ip=10.22.0.214
+es.port=9300
+es.clustername=elasticsearch
+
+##### MySQL #####
+
+mysql.ip=10.22.0.214
+mysql.port=3306
+mysql.username=root
+mysql.password=hadoop123
+
+##### Metrics #####
+
+#reporters
+org.apache.metron.metrics.reporter.graphite=true
+org.apache.metron.metrics.reporter.console=false
+org.apache.metron.metrics.reporter.jmx=false
+
+#Graphite Addresses
+
+org.apache.metron.metrics.graphite.address=localhost
+org.apache.metron.metrics.graphite.port=2023
+
+#TelemetryParserBolt
+org.apache.metron.metrics.TelemetryParserBolt.acks=true
+org.apache.metron.metrics.TelemetryParserBolt.emits=true
+org.apache.metron.metrics.TelemetryParserBolt.fails=true
+
+
+#GenericEnrichmentBolt
+org.apache.metron.metrics.GenericEnrichmentBolt.acks=true
+org.apache.metron.metrics.GenericEnrichmentBolt.emits=true
+org.apache.metron.metrics.GenericEnrichmentBolt.fails=true
+
+
+#TelemetryIndexingBolt
+org.apache.metron.metrics.TelemetryIndexingBolt.acks=true
+org.apache.metron.metrics.TelemetryIndexingBolt.emits=true
+org.apache.metron.metrics.TelemetryIndexingBolt.fails=true
+
+##### Host Enrichment #####
+
+org.apache.metron.enrichment.host.known_hosts=[{"ip":"10.1.128.236", "local":"YES", "type":"webserver", "asset_value" : "important"},\
+{"ip":"10.1.128.237", "local":"UNKNOWN", "type":"unknown", "asset_value" : "important"},\
+{"ip":"10.60.10.254", "local":"YES", "type":"printer", "asset_value" : "important"}]
+
+##### HDFS #####
+
+bolt.hdfs.batch.size=5000
+bolt.hdfs.field.delimiter=|
+bolt.hdfs.file.rotation.size.in.mb=5
+bolt.hdfs.file.system.url=hdfs://iot01.cloud.hortonworks.com:8020
+bolt.hdfs.wip.file.path=/paloalto/wip
+bolt.hdfs.finished.file.path=/paloalto/rotated
+bolt.hdfs.compression.codec.class=org.apache.hadoop.io.compress.SnappyCodec
+index.hdfs.output=/tmp/metron/enriched
+
+##### HBase #####
+bolt.hbase.table.name=pcap
+bolt.hbase.table.fields=t:value
+bolt.hbase.table.key.tuple.field.name=key
+bolt.hbase.table.timestamp.tuple.field.name=timestamp
+bolt.hbase.enable.batching=false
+bolt.hbase.write.buffer.size.in.bytes=2000000
+bolt.hbase.durability=SKIP_WAL
+bolt.hbase.partitioner.region.info.refresh.interval.mins=60
+
+##### Threat Intel #####
+
+threat.intel.tracker.table=
+threat.intel.tracker.cf=
+threat.intel.ip.table=
+threat.intel.ip.cf=
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
new file mode 100644
index 0000000..2765c25
--- /dev/null
+++ b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+import org.apache.metron.integration.util.integration.ComponentRunner;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.metron.integration.util.integration.Processor;
+import org.apache.metron.integration.util.integration.ReadinessState;
+import org.apache.metron.integration.util.integration.components.ElasticSearchComponent;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class ElasticsearchEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
+
+ private String indexDir = "target/elasticsearch";
+ private String dateFormat = "yyyy.MM.dd.hh";
+ private String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date());
+
+ @Override
+ InMemoryComponent getSearchComponent(final Properties topologyProperties) {
+ return new ElasticSearchComponent.Builder()
+ .withHttpPort(9211)
+ .withIndexDir(new File(indexDir))
+ .build();
+ }
+
+ @Override
+ Processor<List<Map<String, Object>>> getProcessor(final List<byte[]> inputMessages) {
+ return new Processor<List<Map<String, Object>>>() {
+ List<Map<String, Object>> docs = null;
+ public ReadinessState process(ComponentRunner runner) {
+ ElasticSearchComponent elasticSearchComponent = runner.getComponent("search", ElasticSearchComponent.class);
+ if (elasticSearchComponent.hasIndex(index)) {
+ List<Map<String, Object>> docsFromDisk;
+ try {
+ docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf");
+ docsFromDisk = readDocsFromDisk(hdfsDir);
+ System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to retrieve indexed documents.", e);
+ }
+ if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
+ return ReadinessState.NOT_READY;
+ } else {
+ return ReadinessState.READY;
+ }
+ } else {
+ return ReadinessState.NOT_READY;
+ }
+ }
+
+ public List<Map<String, Object>> getResult() {
+ return docs;
+ }
+ };
+ }
+
+ @Override
+ void setAdditionalProperties(Properties topologyProperties) {
+ topologyProperties.setProperty("writer.class.name", "org.apache.metron.writer.ElasticsearchWriter");
+ }
+
+ @Override
+ public String cleanField(String field) {
+ return field;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
new file mode 100644
index 0000000..671c4f5
--- /dev/null
+++ b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.components;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.metron.integration.util.integration.UnableToStartException;
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.ElasticsearchClient;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.elasticsearch.search.SearchHit;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ElasticSearchComponent implements InMemoryComponent {
+
+ public static class Builder{
+ private int httpPort;
+ private File indexDir;
+ private Map<String, String> extraElasticSearchSettings = null;
+ public Builder withHttpPort(int httpPort) {
+ this.httpPort = httpPort;
+ return this;
+ }
+ public Builder withIndexDir(File indexDir) {
+ this.indexDir = indexDir;
+ return this;
+ }
+ public Builder withExtraElasticSearchSettings(Map<String, String> extraElasticSearchSettings) {
+ this.extraElasticSearchSettings = extraElasticSearchSettings;
+ return this;
+ }
+ public ElasticSearchComponent build() {
+ return new ElasticSearchComponent(httpPort, indexDir, extraElasticSearchSettings);
+ }
+ }
+
+ private Client client;
+ private Node node;
+ private int httpPort;
+ private File indexDir;
+ private Map<String, String> extraElasticSearchSettings;
+
+ public ElasticSearchComponent(int httpPort, File indexDir) {
+ this(httpPort, indexDir, null);
+ }
+ public ElasticSearchComponent(int httpPort, File indexDir, Map<String, String> extraElasticSearchSettings) {
+ this.httpPort = httpPort;
+ this.indexDir = indexDir;
+ this.extraElasticSearchSettings = extraElasticSearchSettings;
+ }
+ public Client getClient() {
+ return client;
+ }
+
+ private void cleanDir(File dir) throws IOException {
+ if(dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+ dir.mkdirs();
+ }
+ public void start() throws UnableToStartException {
+ File logDir= new File(indexDir, "/logs");
+ File dataDir= new File(indexDir, "/data");
+ try {
+ cleanDir(logDir);
+ cleanDir(dataDir);
+
+ } catch (IOException e) {
+ throw new UnableToStartException("Unable to clean log or data directories", e);
+ }
+ ImmutableSettings.Builder immutableSettings = ImmutableSettings.settingsBuilder()
+ .put("node.http.enabled", true)
+ .put("http.port", httpPort)
+ .put("cluster.name", "metron")
+ .put("path.logs",logDir.getAbsolutePath())
+ .put("path.data",dataDir.getAbsolutePath())
+ .put("gateway.type", "none")
+ .put("index.store.type", "memory")
+ .put("index.number_of_shards", 1)
+ .put("node.mode", "network")
+ .put("index.number_of_replicas", 1);
+ if(extraElasticSearchSettings != null) {
+ immutableSettings = immutableSettings.put(extraElasticSearchSettings);
+ }
+ Settings settings = immutableSettings.build();
+ node = NodeBuilder.nodeBuilder().settings(settings).node();
+ node.start();
+ settings = ImmutableSettings.settingsBuilder()
+ .put("cluster.name", "metron").build();
+ client = new TransportClient(settings)
+ .addTransportAddress(new InetSocketTransportAddress("localhost",
+ 9300));
+
+ waitForCluster(client, ClusterHealthStatus.YELLOW, new TimeValue(60000));
+ }
+
+ public static void waitForCluster(ElasticsearchClient client, ClusterHealthStatus status, TimeValue timeout) throws UnableToStartException {
+ try {
+ ClusterHealthResponse healthResponse =
+ (ClusterHealthResponse)client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(status).timeout(timeout)).actionGet();
+ if (healthResponse != null && healthResponse.isTimedOut()) {
+ throw new UnableToStartException("cluster state is " + healthResponse.getStatus().name()
+ + " and not " + status.name()
+ + ", from here on, everything will fail!");
+ }
+ } catch (ElasticsearchTimeoutException e) {
+ throw new UnableToStartException("timeout, cluster does not respond to health request, cowardly refusing to continue with operations");
+ }
+ }
+
+ public List<Map<String, Object>> getAllIndexedDocs(String index, String sourceType) throws IOException {
+ return getAllIndexedDocs(index, sourceType, null);
+ }
+ public List<Map<String, Object>> getAllIndexedDocs(String index, String sourceType, String subMessage) throws IOException {
+ getClient().admin().indices().refresh(new RefreshRequest());
+ SearchResponse response = getClient().prepareSearch(index)
+ .setTypes(sourceType)
+ .setSource("message")
+ .setFrom(0)
+ .setSize(1000)
+ .execute().actionGet();
+ List<Map<String, Object>> ret = new ArrayList<Map<String, Object>>();
+ for (SearchHit hit : response.getHits()) {
+ Object o = null;
+ if(subMessage == null) {
+ o = hit.getSource();
+ }
+ else {
+ o = hit.getSource().get(subMessage);
+ }
+ ret.add((Map<String, Object>)(o));
+ }
+ return ret;
+ }
+ public boolean hasIndex(String indexName) {
+ Set<String> indices = getClient().admin()
+ .indices()
+ .stats(new IndicesStatsRequest())
+ .actionGet()
+ .getIndices()
+ .keySet();
+ return indices.contains(indexName);
+
+ }
+
+ public void stop() {
+ node.stop();
+ node = null;
+ client = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index a2cec5a..bfb4d91 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -25,7 +25,11 @@ import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
@@ -51,7 +55,7 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
@Override
public Set<String> getStreamIds(JSONObject message) {
Set<String> streamIds = new HashSet<>();
- String sourceType = TopologyUtils.getSourceType(message);
+ String sourceType = TopologyUtils.getSensorType(message);
for (String enrichmentType : getFieldMap(sourceType).keySet()) {
streamIds.add(enrichmentType);
}
@@ -81,7 +85,7 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
return message;
}
- public Map<String, List<String>> getFieldMap(String sourceType) {
- return configurations.get(sourceType).getEnrichmentFieldMap();
+ protected Map<String, List<String>> getFieldMap(String sensorType) {
+ return configurations.getSensorEnrichmentConfig(sensorType).getEnrichmentFieldMap();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
index 7970674..c37133d 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
@@ -105,10 +105,9 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
@SuppressWarnings("unchecked")
@Override
public Map<String, JSONObject> splitMessage(JSONObject message) {
-
Map<String, JSONObject> streamMessageMap = new HashMap<>();
- String sourceType = TopologyUtils.getSourceType(message);
- Map<String, List<String>> enrichmentFieldMap = getFieldMap(sourceType);
+ String sensorType = TopologyUtils.getSensorType(message);
+ Map<String, List<String>> enrichmentFieldMap = getFieldMap(sensorType);
for (String enrichmentType : enrichmentFieldMap.keySet()) {
List<String> fields = enrichmentFieldMap.get(enrichmentType);
JSONObject enrichmentObject = new JSONObject();
@@ -116,7 +115,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
for (String field : fields) {
enrichmentObject.put(getKeyName(enrichmentType, field), message.get(field));
}
- enrichmentObject.put(Constants.SOURCE_TYPE, sourceType);
+ enrichmentObject.put(Constants.SENSOR_TYPE, sensorType);
streamMessageMap.put(enrichmentType, enrichmentObject);
}
}
@@ -124,8 +123,8 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
return streamMessageMap;
}
- protected Map<String, List<String>> getFieldMap(String sourceType) {
- return configurations.get(sourceType).getEnrichmentFieldMap();
+ protected Map<String, List<String>> getFieldMap(String sensorType) {
+ return configurations.getSensorEnrichmentConfig(sensorType).getEnrichmentFieldMap();
}
protected String getKeyName(String type, String field) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index b5c4c44..08b223c 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -18,29 +18,26 @@
package org.apache.metron.enrichment.bolt;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Splitter;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Iterables;
import org.apache.metron.Constants;
import org.apache.metron.bolt.ConfiguredBolt;
import org.apache.metron.domain.Enrichment;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.helpers.topology.ErrorUtils;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import org.apache.metron.helpers.topology.ErrorUtils;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
* Uses an adapter to enrich telemetry messages with additional metadata
@@ -157,8 +154,8 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
for (Object o : rawMessage.keySet()) {
String field = (String) o;
String value = (String) rawMessage.get(field);
- if (field.equals(Constants.SOURCE_TYPE)) {
- enrichedMessage.put(Constants.SOURCE_TYPE, value);
+ if (field.equals(Constants.SENSOR_TYPE)) {
+ enrichedMessage.put(Constants.SENSOR_TYPE, value);
} else {
JSONObject enrichedField = new JSONObject();
if (value != null && value.length() != 0) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index 3516ee0..014e0a9 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -34,8 +34,8 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
}
@Override
- public Map<String, List<String>> getFieldMap(String sourceType) {
- return configurations.get(sourceType).getThreatIntelFieldMap();
+ public Map<String, List<String>> getFieldMap(String sensorType) {
+ return configurations.getSensorEnrichmentConfig(sensorType).getThreatIntelFieldMap();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
index a43360e..692c327 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
@@ -29,8 +29,8 @@ public class ThreatIntelSplitterBolt extends EnrichmentSplitterBolt {
}
@Override
- protected Map<String, List<String>> getFieldMap(String sourceType) {
- return configurations.get(sourceType).getThreatIntelFieldMap();
+ protected Map<String, List<String>> getFieldMap(String sensorType) {
+ return configurations.getSensorEnrichmentConfig(sensorType).getThreatIntelFieldMap();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/pom.xml b/metron-streaming/Metron-Indexing/pom.xml
index 1f5d04d..d32fc07 100644
--- a/metron-streaming/Metron-Indexing/pom.xml
+++ b/metron-streaming/Metron-Indexing/pom.xml
@@ -71,11 +71,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${global_elasticsearch_version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${http.client.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/AbstractIndexingBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/AbstractIndexingBolt.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/AbstractIndexingBolt.java
deleted file mode 100644
index 423a5c2..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/AbstractIndexingBolt.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.indexing;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.metron.bolt.ConfiguredBolt;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-
-import com.codahale.metrics.Counter;
-import org.apache.metron.index.interfaces.IndexAdapter;
-import org.apache.metron.metrics.MetricReporter;
-
-@SuppressWarnings("rawtypes")
-public abstract class AbstractIndexingBolt extends ConfiguredBolt {
- /**
- *
- */
- private static final long serialVersionUID = -6710596708304282838L;
-
- protected static final Logger LOG = LoggerFactory
- .getLogger(AbstractIndexingBolt.class);
-
- protected OutputCollector _collector;
- protected IndexAdapter _adapter;
- protected MetricReporter _reporter;
-
- protected String _IndexIP;
- protected int _IndexPort = 0;
- protected String _ClusterName;
- protected String _IndexName;
- protected String _DocumentName;
- protected int _BulkIndexNumber = 10;
-
- protected Counter ackCounter, emitCounter, failCounter;
-
- public AbstractIndexingBolt(String zookeeperUrl) {
- super(zookeeperUrl);
- }
-
- protected void registerCounters() {
-
- String ackString = _adapter.getClass().getSimpleName() + ".ack";
-
- String emitString = _adapter.getClass().getSimpleName() + ".emit";
-
- String failString = _adapter.getClass().getSimpleName() + ".fail";
-
- ackCounter = _reporter.registerCounter(ackString);
- emitCounter = _reporter.registerCounter(emitString);
- failCounter = _reporter.registerCounter(failString);
-
- }
-
- public final void prepare(Map conf, TopologyContext topologyContext,
- OutputCollector collector) {
- _collector = collector;
-
- if (this._IndexIP == null)
- throw new IllegalStateException("_IndexIP must be specified");
- if (this._IndexPort == 0)
- throw new IllegalStateException("_IndexPort must be specified");
- if (this._ClusterName == null)
- throw new IllegalStateException("_ClusterName must be specified");
- if (this._IndexName == null)
- throw new IllegalStateException("_IndexName must be specified");
- if (this._DocumentName == null)
- throw new IllegalStateException("_DocumentName must be specified");
- if (this._adapter == null)
- throw new IllegalStateException("IndexAdapter must be specified");
-
- try {
- doPrepare(conf, topologyContext, collector);
- } catch (IOException e) {
- LOG.error("Counld not initialize...");
- e.printStackTrace();
- }
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declearer) {
-
- }
-
- abstract void doPrepare(Map conf, TopologyContext topologyContext,
- OutputCollector collector) throws IOException;
-
-}