You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by si...@apache.org on 2016/01/14 18:03:44 UTC
[49/85] [partial] incubator-metron git commit: Rename all OpenSOC
files to Metron
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/alerts/adapters/ThreatAlertsAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/alerts/adapters/ThreatAlertsAdapter.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/alerts/adapters/ThreatAlertsAdapter.java
new file mode 100644
index 0000000..c85087d
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/alerts/adapters/ThreatAlertsAdapter.java
@@ -0,0 +1,311 @@
+package com.opensoc.alerts.adapters;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.validator.routines.InetAddressValidator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.opensoc.alerts.interfaces.AlertsAdapter;
+
+@SuppressWarnings("serial")
+public class ThreatAlertsAdapter implements AlertsAdapter, Serializable {
+
+ String enrichment_tag;
+
+ HTableInterface blacklist_table;
+ HTableInterface whitelist_table;
+ InetAddressValidator ipvalidator = new InetAddressValidator();
+ String _whitelist_table_name;
+ String _blacklist_table_name;
+ String _quorum;
+ String _port;
+ String _topologyname;
+ Configuration conf = null;
+
+ Cache<String, String> cache;
+ String _topology_name;
+
+ Set<String> loaded_whitelist = new HashSet<String>();
+ Set<String> loaded_blacklist = new HashSet<String>();
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(ThreatAlertsAdapter.class);
+
+ public ThreatAlertsAdapter(Map<String, String> config) {
+ try {
+
+ if (!config.containsKey("whitelist_table_name"))
+ throw new Exception("Whitelist table name is missing");
+
+ _whitelist_table_name = config.get("whitelist_table_name");
+
+ if (!config.containsKey("blacklist_table_name"))
+ throw new Exception("Blacklist table name is missing");
+
+ _blacklist_table_name = config.get("blacklist_table_name");
+
+ if (!config.containsKey("quorum"))
+ throw new Exception("Quorum name is missing");
+
+ _quorum = config.get("quorum");
+
+ if (!config.containsKey("port"))
+ throw new Exception("port name is missing");
+
+ _port = config.get("port");
+
+ if (!config.containsKey("_MAX_CACHE_SIZE_OBJECTS_NUM"))
+ throw new Exception("_MAX_CACHE_SIZE_OBJECTS_NUM name is missing");
+
+ int _MAX_CACHE_SIZE_OBJECTS_NUM = Integer.parseInt(config
+ .get("_MAX_CACHE_SIZE_OBJECTS_NUM"));
+
+ if (!config.containsKey("_MAX_TIME_RETAIN_MINUTES"))
+ throw new Exception("_MAX_TIME_RETAIN_MINUTES name is missing");
+
+ int _MAX_TIME_RETAIN_MINUTES = Integer.parseInt(config
+ .get("_MAX_TIME_RETAIN_MINUTES"));
+
+ cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE_OBJECTS_NUM)
+ .expireAfterWrite(_MAX_TIME_RETAIN_MINUTES, TimeUnit.MINUTES)
+ .build();
+
+ enrichment_tag = config.get("enrichment_tag");
+
+ } catch (Exception e) {
+ System.out.println("Could not initialize alerts adapter");
+ e.printStackTrace();
+ System.exit(0);
+ }
+ }
+
+ @SuppressWarnings("resource")
+ @Override
+ public boolean initialize() {
+
+ conf = HBaseConfiguration.create();
+ // conf.set("hbase.zookeeper.quorum", _quorum);
+ // conf.set("hbase.zookeeper.property.clientPort", _port);
+
+ LOG.trace("[OpenSOC] Connecting to hbase with conf:" + conf);
+ LOG.trace("[OpenSOC] Whitelist table name: " + _whitelist_table_name);
+ LOG.trace("[OpenSOC] Whitelist table name: " + _blacklist_table_name);
+ LOG.trace("[OpenSOC] ZK Client/port: "
+ + conf.get("hbase.zookeeper.quorum") + " -> "
+ + conf.get("hbase.zookeeper.property.clientPort"));
+
+ try {
+
+ LOG.trace("[OpenSOC] Attempting to connect to hbase");
+
+ HConnection connection = HConnectionManager.createConnection(conf);
+
+ LOG.trace("[OpenSOC] CONNECTED TO HBASE");
+
+ HBaseAdmin hba = new HBaseAdmin(conf);
+
+ if (!hba.tableExists(_whitelist_table_name))
+ throw new Exception("Whitelist table doesn't exist");
+
+ if (!hba.tableExists(_blacklist_table_name))
+ throw new Exception("Blacklist table doesn't exist");
+
+ whitelist_table = new HTable(conf, _whitelist_table_name);
+
+ LOG.trace("[OpenSOC] CONNECTED TO TABLE: " + _whitelist_table_name);
+ blacklist_table = new HTable(conf, _blacklist_table_name);
+ LOG.trace("[OpenSOC] CONNECTED TO TABLE: " + _blacklist_table_name);
+
+ if (connection == null || whitelist_table == null
+ || blacklist_table == null)
+ throw new Exception("Unable to initialize hbase connection");
+
+ Scan scan = new Scan();
+
+ ResultScanner rs = whitelist_table.getScanner(scan);
+ try {
+ for (Result r = rs.next(); r != null; r = rs.next()) {
+ loaded_whitelist.add(Bytes.toString(r.getRow()));
+ }
+ } catch (Exception e) {
+ LOG.trace("[OpenSOC] COULD NOT READ FROM HBASE");
+ e.printStackTrace();
+ } finally {
+ rs.close(); // always close the ResultScanner!
+ hba.close();
+ }
+ whitelist_table.close();
+
+ LOG.trace("[OpenSOC] READ IN WHITELIST: " + loaded_whitelist.size());
+
+ scan = new Scan();
+
+ rs = blacklist_table.getScanner(scan);
+ try {
+ for (Result r = rs.next(); r != null; r = rs.next()) {
+ loaded_blacklist.add(Bytes.toString(r.getRow()));
+ }
+ } catch (Exception e) {
+ LOG.trace("[OpenSOC] COULD NOT READ FROM HBASE");
+ e.printStackTrace();
+ } finally {
+ rs.close(); // always close the ResultScanner!
+ hba.close();
+ }
+ blacklist_table.close();
+
+ LOG.trace("[OpenSOC] READ IN WHITELIST: " + loaded_whitelist.size());
+
+ rs.close(); // always close the ResultScanner!
+ hba.close();
+
+ return true;
+ } catch (Exception e) {
+
+ e.printStackTrace();
+ }
+
+ return false;
+
+ }
+
+ @Override
+ public boolean refresh() throws Exception {
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, JSONObject> alert(JSONObject raw_message) {
+
+ System.out.println("LOOKING FOR ENRICHMENT TAG: " + enrichment_tag);
+
+ Map<String, JSONObject> alerts = new HashMap<String, JSONObject>();
+ JSONObject content = (JSONObject) raw_message.get("message");
+
+ JSONObject enrichment = null;
+
+ if (raw_message.containsKey("enrichment"))
+ enrichment = (JSONObject) raw_message.get("enrichment");
+ else
+ return null;
+
+ if (enrichment.containsKey(enrichment_tag)) {
+
+ System.out.println("FOUND TAG: " + enrichment_tag);
+
+ JSONObject threat = (JSONObject) enrichment.get(enrichment_tag);
+
+ int cnt = 0;
+ Object enriched_key = null;
+
+ for (Object key : threat.keySet()) {
+ JSONObject tmp = (JSONObject) threat.get(key);
+ cnt = cnt + tmp.size();
+ if (tmp.size() > 0)
+ enriched_key = key;
+ }
+
+ if (cnt == 0) {
+ System.out.println("TAG HAS NO ELEMENTS");
+ return null;
+ }
+
+ JSONObject alert = new JSONObject();
+
+ String source = "unknown";
+ String dest = "unknown";
+ String host = "unknown";
+
+ if (content.containsKey("ip_src_addr")) {
+ source = content.get("ip_src_addr").toString();
+
+ if (RangeChecker.checkRange(loaded_whitelist, source))
+ host = source;
+ }
+
+ if (content.containsKey("ip_dst_addr")) {
+ dest = content.get("ip_dst_addr").toString();
+
+ if (RangeChecker.checkRange(loaded_whitelist, dest))
+ host = dest;
+ }
+
+ JSONObject threatQualifier = (JSONObject) threat.get(enriched_key);
+
+ alert.put("designated_host", host);
+ String description =
+
+ new StringBuilder()
+ .append("Threat Intelligence match for ")
+ .append(content.get(enriched_key).toString())
+ .append(" from source: ")
+ .append(threatQualifier.keySet().iterator().next().toString())
+ .toString();
+ alert.put("description", description);
+ alert.put("priority", "MED");
+
+ String alert_id = generateAlertId(source, dest, 0);
+
+ alert.put("alert_id", alert_id);
+ alerts.put(alert_id, alert);
+
+ alert.put("enrichment", enrichment);
+
+ return alerts;
+ } else {
+ System.out.println("DID NOT FIND TAG: " + enrichment_tag);
+ return null;
+ }
+
+ }
+
+ @Override
+ public boolean containsAlertId(String alert) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ protected String generateAlertId(String source_ip, String dst_ip,
+ int alert_type) {
+
+ String key = makeKey(source_ip, dst_ip, alert_type);
+
+ if (cache.getIfPresent(key) != null)
+ return cache.getIfPresent(key);
+
+ String new_UUID = System.currentTimeMillis() + "-" + UUID.randomUUID();
+
+ cache.put(key, new_UUID);
+ key = makeKey(dst_ip, source_ip, alert_type);
+ cache.put(key, new_UUID);
+
+ return new_UUID;
+
+ }
+
+ private String makeKey(String ip1, String ip2, int alert_type) {
+ return (ip1 + "-" + ip2 + "-" + alert_type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/AbstractTaggerBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/AbstractTaggerBolt.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/AbstractTaggerBolt.java
new file mode 100644
index 0000000..e22c3cf
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/AbstractTaggerBolt.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.tagging;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+
+import com.codahale.metrics.Counter;
+import com.opensoc.alerts.interfaces.TaggerAdapter;
+import com.opensoc.metrics.MetricReporter;
+
+@SuppressWarnings("rawtypes")
+public abstract class AbstractTaggerBolt extends BaseRichBolt {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -6710596708304282838L;
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(AbstractTaggerBolt.class);
+
+ protected OutputCollector _collector;
+ protected TaggerAdapter _adapter;
+
+ protected String OutputFieldName;
+ protected JSONObject _identifier;
+ protected MetricReporter _reporter;
+
+ protected Counter ackCounter, emitCounter, failCounter;
+
+ protected void registerCounters() {
+
+ String ackString = _adapter.getClass().getSimpleName() + ".ack";
+
+ String emitString = _adapter.getClass().getSimpleName() + ".emit";
+
+ String failString = _adapter.getClass().getSimpleName() + ".fail";
+
+ ackCounter = _reporter.registerCounter(ackString);
+ emitCounter = _reporter.registerCounter(emitString);
+ failCounter = _reporter.registerCounter(failString);
+
+ }
+
+ public final void prepare(Map conf, TopologyContext topologyContext,
+ OutputCollector collector) {
+ _collector = collector;
+
+ if (this._adapter == null)
+ throw new IllegalStateException("Tagging must be specified");
+ if(this._identifier == null)
+ throw new IllegalStateException("Identifier must be specified");
+ try {
+ doPrepare(conf, topologyContext, collector);
+ } catch (IOException e) {
+ LOG.error("Counld not initialize...");
+ e.printStackTrace();
+ }
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declearer) {
+ declearer.declare(new Fields(this.OutputFieldName));
+ }
+
+ abstract void doPrepare(Map conf, TopologyContext topologyContext,
+ OutputCollector collector) throws IOException;
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/TelemetryTaggerBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/TelemetryTaggerBolt.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/TelemetryTaggerBolt.java
new file mode 100644
index 0000000..a31e1b7
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/TelemetryTaggerBolt.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.tagging;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.configuration.Configuration;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import com.opensoc.alerts.interfaces.TaggerAdapter;
+import com.opensoc.json.serialization.JSONEncoderHelper;
+import com.opensoc.metrics.MetricReporter;
+
+@SuppressWarnings("rawtypes")
+public class TelemetryTaggerBolt extends AbstractTaggerBolt {
+
+ /**
+ * Use an adapter to tag existing telemetry messages with alerts. The list
+ * of available tagger adapters is located under
+ * com.opensoc.tagging.adapters. At the time of the release the following
+ * adapters are available:
+ *
+ * <p>
+ * <ul>
+ * <li>RegexTagger = read a list or regular expressions and tag a message if
+ * they exist in a message
+ * <li>StaticAllTagger = tag each message with a static alert
+ * <ul>
+ * <p>
+ */
+ private static final long serialVersionUID = -2647123143398352020L;
+ private Properties metricProperties;
+ private JSONObject metricConfiguration;
+
+ /**
+ *
+ * @param tagger
+ * - tagger adapter for generating alert tags
+ * @return instance of bolt
+ */
+ public TelemetryTaggerBolt withMessageTagger(TaggerAdapter tagger) {
+ _adapter = tagger;
+ return this;
+ }
+
+ /**
+ *
+ * @param OutputFieldName
+ * - output name of the tuple coming out of this bolt
+ * @return - instance of this bolt
+ */
+ public TelemetryTaggerBolt withOutputFieldName(String OutputFieldName) {
+ this.OutputFieldName = OutputFieldName;
+ return this;
+ }
+
+ /**
+ *
+ * @param metricProperties
+ * - metric output to graphite
+ * @return - instance of this bolt
+ */
+ public TelemetryTaggerBolt withMetricProperties(Properties metricProperties) {
+ this.metricProperties = metricProperties;
+ return this;
+ }
+
+ /**
+ *
+ * @param identifier
+ * - the identifier tag for tagging telemetry messages with
+ * alerts out of this bolt
+ * @return - instance of this bolt
+ */
+
+ public TelemetryTaggerBolt withIdentifier(JSONObject identifier) {
+ this._identifier = identifier;
+ return this;
+ }
+
+ /**
+ * @param config
+ * A class for generating custom metrics into graphite
+ * @return Instance of this class
+ */
+
+ public TelemetryTaggerBolt withMetricConfiguration(Configuration config) {
+ this.metricConfiguration = JSONEncoderHelper.getJSON(config
+ .subset("com.opensoc.metrics"));
+ return this;
+ }
+
+ @Override
+ void doPrepare(Map conf, TopologyContext topologyContext,
+ OutputCollector collector) throws IOException {
+
+ LOG.info("[OpenSOC] Preparing TelemetryParser Bolt...");
+
+ try {
+ _reporter = new MetricReporter();
+ _reporter.initialize(metricProperties, TelemetryTaggerBolt.class);
+ LOG.info("[OpenSOC] Initialized metrics");
+ } catch (Exception e) {
+ LOG.info("[OpenSOC] Could not initialize metrics");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void execute(Tuple tuple) {
+
+ LOG.trace("[OpenSOC] Starting to process message for alerts");
+ JSONObject original_message = null;
+
+ try {
+
+ original_message = (JSONObject) tuple.getValue(0);
+
+ if (original_message == null || original_message.isEmpty())
+ throw new Exception("Could not parse message from byte stream");
+
+ LOG.trace("[OpenSOC] Received tuple: " + original_message);
+
+ JSONObject alerts_tag = new JSONObject();
+ JSONArray alerts_list = _adapter.tag(original_message);
+
+ LOG.trace("[OpenSOC] Tagged message: " + alerts_list);
+
+ if (alerts_list.size() != 0) {
+ if (original_message.containsKey("alerts")) {
+ JSONObject tag = (JSONObject) original_message
+ .get("alerts");
+ JSONArray already_triggered = (JSONArray) tag
+ .get("triggered");
+ alerts_list.addAll(already_triggered);
+ LOG.trace("[OpenSOC] Created a new string of alerts");
+ }
+
+ alerts_tag.put("identifier", _identifier);
+ alerts_tag.put("triggered", alerts_list);
+ original_message.put("alerts", alerts_tag);
+
+ LOG.debug("[OpenSOC] Detected alerts: " + alerts_tag);
+ }
+ else
+ {
+ LOG.debug("[OpenSOC] The following messages did not contain alerts: " + original_message);
+ }
+
+ _collector.ack(tuple);
+ _collector.emit(new Values(original_message));
+
+ /*if (metricConfiguration != null) {
+ emitCounter.inc();
+ ackCounter.inc();
+ }*/
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.error("Failed to tag message :" + original_message);
+ e.printStackTrace();
+ _collector.fail(tuple);
+
+ /*
+ if (metricConfiguration != null) {
+ failCounter.inc();
+ }*/
+ }
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declearer) {
+ declearer.declare(new Fields(this.OutputFieldName));
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/AbstractTaggerAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/AbstractTaggerAdapter.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/AbstractTaggerAdapter.java
new file mode 100644
index 0000000..2ec6377
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/AbstractTaggerAdapter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.tagging.adapters;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.alerts.interfaces.TaggerAdapter;
+
+@SuppressWarnings("serial")
+public abstract class AbstractTaggerAdapter implements TaggerAdapter, Serializable{
+
+ protected static final Logger _LOG = LoggerFactory
+ .getLogger(AbstractTaggerAdapter.class);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/RegexTagger.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/RegexTagger.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/RegexTagger.java
new file mode 100644
index 0000000..2d8109f
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/RegexTagger.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.tagging.adapters;
+
+import java.util.Map;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class RegexTagger extends AbstractTaggerAdapter{
+
+ /**
+ * Reads a regex rules file and tags a message with alerts if any rule from that file
+ * matches anything in the telemetry message
+ */
+ private static final long serialVersionUID = -6091495636459799411L;
+ Map <String, JSONObject> _rules;
+
+ /**
+ *
+ * @param rules rules read from a properties XML file
+ */
+ public RegexTagger(Map<String, JSONObject> rules)
+ {
+ _rules = rules;
+ }
+
+ /**
+ * @param raw_message telemetry message to be tagged
+ */
+ @SuppressWarnings("unchecked")
+ public JSONArray tag(JSONObject raw_message) {
+
+ JSONArray ja = new JSONArray();
+ String message_as_string = raw_message.toString();
+
+ for(String rule : _rules.keySet())
+ {
+ if (message_as_string.matches(rule))
+ {
+ ja.add(_rules.get(rule));
+ }
+ }
+
+ return ja;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/StaticAllTagger.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/StaticAllTagger.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/StaticAllTagger.java
new file mode 100644
index 0000000..67c6c45
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/StaticAllTagger.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.tagging.adapters;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class StaticAllTagger extends AbstractTaggerAdapter {
+
+ /**
+ * Attaches a static alerts tag to JSON telemetry messages
+ */
+ private static final long serialVersionUID = 7759427661169094065L;
+ private JSONObject _static_tag_message;
+ JSONArray ja = new JSONArray();
+
+ /**
+ *
+ * @param static_tag_message
+ * static alerts tag to attach to the message as a JSON
+ */
+ @SuppressWarnings("unchecked")
+ public StaticAllTagger(JSONObject static_tag_message) {
+ _static_tag_message = static_tag_message;
+ ja.add(_static_tag_message);
+ }
+
+ /**
+ * @param raw_message
+ * message to tag
+ */
+ public JSONArray tag(JSONObject raw_message) {
+
+ return ja;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/TaggerAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/TaggerAdapter.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/TaggerAdapter.java
new file mode 100644
index 0000000..9fc11d7
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/TaggerAdapter.java
@@ -0,0 +1,9 @@
+package com.opensoc.tagging.adapters;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public interface TaggerAdapter {
+
+ JSONArray tag(JSONObject raw_message);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml b/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml
new file mode 100644
index 0000000..8d812a9
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml
@@ -0,0 +1,131 @@
+<!--Tue Apr 1 18:16:39 2014-->
+ <configuration>
+ <property>
+ <name>hbase.tmp.dir</name>
+ <value>/disk/h/hbase</value>
+ </property>
+ <property>
+ <name>hbase.hregion.memstore.chunkpool.maxsize</name>
+ <value>0.5</value>
+ </property>
+ <property>
+ <name>hbase.regionserver.codecs</name>
+ <value>lzo,gz,snappy</value>
+ </property>
+ <property>
+ <name>hbase.hstore.flush.retries.number</name>
+ <value>120</value>
+ </property>
+ <property>
+ <name>hbase.client.keyvalue.maxsize</name>
+ <value>10485760</value>
+ </property>
+ <property>
+ <name>hbase.rootdir</name>
+ <value>hdfs://nn1:8020/apps/hbase/data</value>
+ </property>
+ <property>
+ <name>hbase.defaults.for.version.skip</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hbase.client.scanner.caching</name>
+ <value>100</value>
+ </property>
+ <property>
+ <name>hbase.superuser</name>
+ <value>hbase</value>
+ </property>
+ <property>
+ <name>hfile.block.cache.size</name>
+ <value>0.40</value>
+ </property>
+ <property>
+ <name>hbase.regionserver.checksum.verify</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hbase.hregion.memstore.mslab.enabled</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hbase.hregion.max.filesize</name>
+ <value>107374182400</value>
+ </property>
+ <property>
+ <name>hbase.cluster.distributed</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>zookeeper.session.timeout</name>
+ <value>30000</value>
+ </property>
+ <property>
+ <name>zookeeper.znode.parent</name>
+ <value>/hbase-unsecure</value>
+ </property>
+ <property>
+ <name>hbase.regionserver.global.memstore.lowerLimit</name>
+ <value>0.38</value>
+ </property>
+ <property>
+ <name>hbase.regionserver.handler.count</name>
+ <value>240</value>
+ </property>
+ <property>
+ <name>hbase.hregion.memstore.mslab.chunksize</name>
+ <value>8388608</value>
+ </property>
+ <property>
+ <name>hbase.zookeeper.quorum</name>
+ <value>zkpr1,zkpr2,zkpr3</value>
+ </property>
+ <property>
+ <name>hbase.zookeeper.useMulti</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hbase.hregion.majorcompaction</name>
+ <value>86400000</value>
+ </property>
+ <property>
+ <name>hbase.hstore.blockingStoreFiles</name>
+ <value>200</value>
+ </property>
+ <property>
+ <name>hbase.zookeeper.property.clientPort</name>
+ <value>2181</value>
+ </property>
+ <property>
+ <name>hbase.hregion.memstore.flush.size</name>
+ <value>134217728</value>
+ </property>
+ <property>
+ <name>hbase.security.authorization</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>hbase.regionserver.global.memstore.upperLimit</name>
+ <value>0.4</value>
+ </property>
+ <property>
+ <name>hbase.hstore.compactionThreshold</name>
+ <value>4</value>
+ </property>
+ <property>
+ <name>hbase.hregion.memstore.block.multiplier</name>
+ <value>8</value>
+ </property>
+ <property>
+ <name>hbase.security.authentication</name>
+ <value>simple</value>
+ </property>
+ <property>
+ <name>dfs.client.read.shortcircuit</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>dfs.domain.socket.path</name>
+ <value>/var/run/hdfs/dn_socket</value>
+ </property>
+ </configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Alerts/src/test/java/com/apache/metron/alerts/adapters/AllAlertAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/test/java/com/apache/metron/alerts/adapters/AllAlertAdapterTest.java b/metron-streaming/Metron-Alerts/src/test/java/com/apache/metron/alerts/adapters/AllAlertAdapterTest.java
new file mode 100644
index 0000000..65c74c0
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/test/java/com/apache/metron/alerts/adapters/AllAlertAdapterTest.java
@@ -0,0 +1,166 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.alerts.adapters;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.Properties;
+
+import com.opensoc.test.AbstractConfigTest;
+import com.opensoc.alerts.adapters.AllAlertAdapter;
+
+ /**
+ * <ul>
+ * <li>Title: AllAlertAdapterTest</li>
+ * <li>Description: Tests for AllAlertAdapter</li>
+ * <li>Created: Oct 8, 2014</li>
+ * </ul>
+ * @version $Revision: 1.1 $
+ */
+public class AllAlertAdapterTest extends AbstractConfigTest {
+
+ /**
+ * The allAlertAdapter.
+ */
+ private static AllAlertAdapter allAlertAdapter=null;
+
+ /**
+ * The connected.
+ */
+ private static boolean connected=false;
+
+ /**
+ * Constructs a new <code>AllAlertAdapterTest</code> instance.
+ * @param name
+ */
+ public AllAlertAdapterTest(String name) {
+ super(name);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ protected static void setUpBeforeClass() throws Exception {
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ protected static void tearDownAfterClass() throws Exception {
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see junit.framework.TestCase#setUp()
+ */
+
+ @SuppressWarnings("unchecked")
+ protected void setUp() throws Exception {
+ super.setUp("com.opensoc.alerts.adapters.AllAlertAdapter");
+ Properties prop = super.getTestProperties();
+ assertNotNull(prop);
+ // this.setMode("global");
+ if(skipTests(this.getMode())){
+ System.out.println(getClass().getName()+" Skipping Tests !!Local Mode");
+ return;//skip tests
+ }else{
+ Map<String, String> settings = super.getSettings();
+ @SuppressWarnings("rawtypes")
+ Class loaded_class = Class.forName("com.opensoc.alerts.adapters.AllAlertAdapter");
+ @SuppressWarnings("rawtypes")
+ Constructor constructor = loaded_class.getConstructor(new Class[] { Map.class});
+
+ AllAlertAdapterTest.allAlertAdapter = (AllAlertAdapter) constructor.newInstance(settings);
+ // AllAlertAdapterTest.allAlertAdapter = new AllAlertAdapter(settings)
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see junit.framework.TestCase#tearDown()
+ */
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+
+ /**
+ * Test method for {@link com.opensoc.alerts.adapters.AlllterAdapter#initialize()}.
+ */
+ public void testInitializeAdapter() {
+ if(skipTests(this.getMode())){
+ return;//skip tests
+ }else{
+
+ boolean initialized =AllAlertAdapterTest.getAllAlertAdapter().initialize();
+ assertTrue(initialized);
+ }
+ }
+
+ /**
+ * Test method for containsAlertId(@link com.opensoc.alerts.adapters.AlllterAdapter#containsAlertId()}.
+ */
+ public void testContainsAlertId(){
+ if(skipTests(this.getMode())){
+ return;//skip tests
+ }else{
+ boolean containsAlert =AllAlertAdapterTest.getAllAlertAdapter().containsAlertId("test");
+ assertFalse(containsAlert);
+ }
+ }
+
+
+
+ /**
+ * Returns the allAlertAdapter.
+ * @return the allAlertAdapter.
+ */
+
+ public static AllAlertAdapter getAllAlertAdapter() {
+ return allAlertAdapter;
+ }
+
+ /**
+ * Sets the allAlertAdapter.
+ * @param allAlertAdapter the allAlertAdapter.
+ */
+
+ public static void setAllAlertAdapter(AllAlertAdapter allAlertAdapter) {
+
+ AllAlertAdapterTest.allAlertAdapter = allAlertAdapter;
+ }
+ /**
+ * Returns the connected.
+ * @return the connected.
+ */
+
+ public static boolean isConnected() {
+ return connected;
+ }
+
+ /**
+ * Sets the connected.
+ * @param connected the connected.
+ */
+
+ public static void setConnected(boolean connected) {
+
+ AllAlertAdapterTest.connected = connected;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Alerts/src/test/resources/AllAlertAdapterTest.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/test/resources/AllAlertAdapterTest.properties b/metron-streaming/Metron-Alerts/src/test/resources/AllAlertAdapterTest.properties
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/test/resources/AllAlertAdapterTest.properties
@@ -0,0 +1 @@
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Alerts/src/test/resources/TestSchemas/AllAlertAdapterSchema.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/test/resources/TestSchemas/AllAlertAdapterSchema.json b/metron-streaming/Metron-Alerts/src/test/resources/TestSchemas/AllAlertAdapterSchema.json
new file mode 100644
index 0000000..c4f2a82
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/test/resources/TestSchemas/AllAlertAdapterSchema.json
@@ -0,0 +1,42 @@
+{
+"title": "GeoMySql Schema",
+"type": "object",
+"properties": {
+
+ "city" : {
+ "type": "string"
+ },
+ "country" : {
+ "type": "string"
+ },
+ "dmaCode" :
+ {
+ "type": "string"
+ },
+ "geoHash" :
+ {
+ "type": "string"
+ },
+ "latitude" :
+ {
+ "type": "string"
+ },
+ "locID" :
+ {
+ "type": "string"
+ },
+ "location_point" :
+ {
+ "type": "string"
+ },
+ "longitude" :
+ {
+ "type": "string"
+ },
+ "postalCode" :
+ {
+ "type": "string"
+ }
+ },
+ "required": ["city", "country", "dmaCode","latitude","locID","location_point","postalCode"]
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Alerts/src/test/resources/config/AllAlertAdapterTest.config
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/test/resources/config/AllAlertAdapterTest.config b/metron-streaming/Metron-Alerts/src/test/resources/config/AllAlertAdapterTest.config
new file mode 100644
index 0000000..f6e5dd1
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/test/resources/config/AllAlertAdapterTest.config
@@ -0,0 +1,8 @@
+#Alerts Bolt
+bolt.alerts.adapter=com.opensoc.alerts.adapters.AllAlertAdapter
+com.opensoc.alerts.adapters.AllAlertAdapter.whitelist_table_name = ip_whitelist
+com.opensoc.alerts.adapters.AllAlertAdapter.blacklist_table_name = ip_blacklist
+com.opensoc.alerts.adapters.AllAlertAdapter.quorum=zkpr1,zkpr2,zkpr3
+com.opensoc.alerts.adapters.AllAlertAdapter.port=2181
+com.opensoc.alerts.adapters.AllAlertAdapter._MAX_CACHE_SIZE_OBJECTS_NUM=3600
+com.opensoc.alerts.adapters.AllAlertAdapter._MAX_TIME_RETAIN_MINUTES=1000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/.gitignore
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/.gitignore b/metron-streaming/Metron-Common/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/metron-streaming/Metron-Common/.gitignore
@@ -0,0 +1 @@
+/target/
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
new file mode 100644
index 0000000..ad1382f
--- /dev/null
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -0,0 +1,170 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.opensoc</groupId>
+ <artifactId>OpenSOC-Streaming</artifactId>
+ <version>0.6BETA</version>
+ </parent>
+ <artifactId>OpenSOC-Common</artifactId>
+ <name>OpenSOC-Common</name>
+ <description>Components common to all enrichments</description>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <kafka.version>0.8.0</kafka.version>
+ <commons.config.version>1.10</commons.config.version>
+ <hbase.version>0.98.5-hadoop2</hbase.version>
+ </properties>
+ <repositories>
+ <repository>
+ <id>OpenSOC-Kraken-Repo</id>
+ <name>OpenSOC Kraken Repository</name>
+ <url>https://raw.github.com/opensoc/kraken/mvn-repo</url>
+ </repository>
+ </repositories>
+ <dependencies>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>${global_json_simple_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+
+ <artifactId>servlet-api</artifactId>
+
+ <groupId>javax.servlet</groupId>
+
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>0.8.0</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${global_metrics_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ <version>${global_metrics_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>${commons.config.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.krakenapps</groupId>
+ <artifactId>kraken-pcap</artifactId>
+ <version>1.7.1</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>json-schema-validator</artifactId>
+ <version>${global_json_schema_validator_version}</version>
+ </dependency>
+ </dependencies>
+
+ <reporting>
+ <plugins>
+ <!-- Normally, dependency report takes time, skip it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>2.7</version>
+
+ <configuration>
+ <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>emma-maven-plugin</artifactId>
+ <version>1.0-alpha-3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-pmd-plugin</artifactId>
+ <configuration>
+ <targetJdk>1.7</targetJdk>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.7</source>
+ <compilerArgument>-Xlint:unchecked</compilerArgument>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/pom.xml.versionsBackup
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml.versionsBackup b/metron-streaming/Metron-Common/pom.xml.versionsBackup
new file mode 100644
index 0000000..8ead949
--- /dev/null
+++ b/metron-streaming/Metron-Common/pom.xml.versionsBackup
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.opensoc</groupId>
+ <artifactId>OpenSOC-Streaming</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>OpenSOC-Common</artifactId>
+ <name>OpenSOC-Common</name>
+ <description>Components common to all enrichments</description>
+ <properties>
+ <json.simple.version>1.1.1</json.simple.version>
+
+ <storm.version>0.9.2-incubating</storm.version>
+ <kafka.version>0.8.0</kafka.version>
+ <metrics.version>3.0.2</metrics.version>
+ <commons.config.version>1.10</commons.config.version>
+ <hbase.version>0.98.5-hadoop2</hbase.version>
+ </properties>
+ <repositories>
+ <repository>
+ <id>Kraken-Repo</id>
+ <name>Kraken Repository</name>
+ <url>http://download.krakenapps.org</url>
+ </repository>
+ </repositories>
+ <dependencies>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>${json.simple.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${storm.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>0.8.0</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>${commons.config.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.krakenapps</groupId>
+ <artifactId>kraken-pcap</artifactId>
+ <version>1.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+ </dependencies>
+
+ <reporting>
+ <plugins>
+ <!-- Normally, dependency report takes time, skip it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>2.7</version>
+
+ <configuration>
+ <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>emma-maven-plugin</artifactId>
+ <version>1.0-alpha-3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-pmd-plugin</artifactId>
+ <configuration>
+ <targetJdk>1.7</targetJdk>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsAdapter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsAdapter.java
new file mode 100644
index 0000000..58567a6
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsAdapter.java
@@ -0,0 +1,16 @@
+package com.opensoc.alerts.interfaces;
+
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+
+public interface AlertsAdapter {
+
+ boolean initialize();
+
+ boolean refresh() throws Exception;
+
+ Map<String, JSONObject> alert(JSONObject raw_message);
+
+ boolean containsAlertId(String alert);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsInterface.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsInterface.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsInterface.java
new file mode 100644
index 0000000..e5e32b7
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsInterface.java
@@ -0,0 +1,11 @@
+package com.opensoc.alerts.interfaces;
+
+import org.json.simple.JSONObject;
+
+public interface AlertsInterface {
+
+ public JSONObject getContent();
+ public void setContent(JSONObject content);
+ public String getUuid();
+ public void setUuid(String uuid);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/TaggerAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/TaggerAdapter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/TaggerAdapter.java
new file mode 100644
index 0000000..79dc0d6
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/TaggerAdapter.java
@@ -0,0 +1,9 @@
+package com.opensoc.alerts.interfaces;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public interface TaggerAdapter {
+
+ JSONArray tag(JSONObject raw_message);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/configuration/ConfigurationManager.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/configuration/ConfigurationManager.java
new file mode 100644
index 0000000..74f19a5
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/configuration/ConfigurationManager.java
@@ -0,0 +1,119 @@
+package com.opensoc.configuration;
+
+
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.configuration.CombinedConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.DefaultConfigurationBuilder;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Configuration manager class which loads all 'config-definition.xml' files and
+ * creates a Configuration object which holds all properties from the underlying
+ * configuration resource
+ */
+public class ConfigurationManager {
+
+ /** configuration definition file name. */
+ private static String DEFAULT_CONFIG_DEFINITION_FILE_NAME = "config-definition.xml";
+
+ /** Stores a map with the configuration for each path specified. */
+ private static Map<String, Configuration> configurationsCache = new HashMap<String, Configuration>();
+
+ /** The Constant LOGGER. */
+ private static final Logger LOGGER = Logger
+ .getLogger(ConfigurationManager.class);
+
+ /**
+ * Common method to load content of all configuration resources defined in
+ * 'config-definition.xml'.
+ *
+ * @param configDefFilePath
+ * the config def file path
+ * @return Configuration
+ */
+ public static Configuration getConfiguration(String configDefFilePath) {
+ if (configurationsCache.containsKey(configDefFilePath)) {
+ return configurationsCache.get(configDefFilePath);
+ }
+ CombinedConfiguration configuration = null;
+ synchronized (configurationsCache) {
+ if (configurationsCache.containsKey(configDefFilePath)) {
+ return configurationsCache.get(configDefFilePath);
+ }
+ DefaultConfigurationBuilder builder = new DefaultConfigurationBuilder();
+ String fielPath = getConfigDefFilePath(configDefFilePath);
+ LOGGER.info("loading from 'configDefFilePath' :" + fielPath);
+ builder.setFile(new File(fielPath));
+ try {
+ configuration = builder.getConfiguration(true);
+ configurationsCache.put(fielPath, configuration);
+ } catch (ConfigurationException e) {
+ LOGGER.info("Exception in loading property files.", e);
+ }
+ }
+ return configuration;
+ }
+
+ /**
+ * Removes the configuration created from a config definition file located at
+ * 'configDefFilePath'.
+ *
+ * @param configDefFilePath
+ * path to the config definition file
+ */
+ public static void clearConfiguration(String configDefFilePath) {
+ configurationsCache.remove(configDefFilePath);
+ }
+
+ /**
+ * Gets the configuration.
+ *
+ * @return the configuration
+ */
+ public static Configuration getConfiguration() {
+ return getConfiguration(null);
+ }
+
+ /**
+ * Returns the 'config-definition.xml' file path. 1. If the param
+ * 'configDefFilePath' has a valid value, returns configDefFilePath 2. If the
+ * system property key 'configDefFilePath' has a valid value, returns the
+ * value 3. By default, it returns the file name 'config-definition.xml'
+ *
+ * @param configDefFilePath
+ * given input path to the config definition file
+ * @return the config def file path
+ */
+ private static String getConfigDefFilePath(String configDefFilePath) {
+ if (StringUtils.isNotEmpty(configDefFilePath)) {
+ return configDefFilePath;
+ }
+ return DEFAULT_CONFIG_DEFINITION_FILE_NAME;
+ }
+
+ /**
+ * The main method.
+ *
+ * @param args
+ * the args
+ * @throws InterruptedException
+ * the interrupted exception
+ */
+ public static void main(String[] args) throws InterruptedException {
+ Configuration config = ConfigurationManager
+ .getConfiguration("/Users/Sayi/Documents/config/config-definition-dpi.xml");
+ System.out.println("elastic.search.cluster ="
+ + config.getString("elastic.search.cluster"));
+ Thread.sleep(10000);
+ System.out.println("storm.topology.dpi.bolt.es-index.index.name ="
+ + config.getString("storm.topology.dpi.bolt.es-index.index.name"));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/dataloads/interfaces/ThreatIntelSource.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/dataloads/interfaces/ThreatIntelSource.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/dataloads/interfaces/ThreatIntelSource.java
new file mode 100644
index 0000000..e19646a
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/dataloads/interfaces/ThreatIntelSource.java
@@ -0,0 +1,11 @@
+package com.opensoc.dataloads.interfaces;
+
+import java.util.Iterator;
+import org.apache.commons.configuration.Configuration;
+import org.json.simple.JSONObject;
+
+public interface ThreatIntelSource extends Iterator<JSONObject> {
+
+ void initializeSource(Configuration config);
+ void cleanupSource();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/enrichment/interfaces/EnrichmentAdapter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
new file mode 100644
index 0000000..a6fdd85
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.interfaces;
+
+import org.json.simple.JSONObject;
+
+public interface EnrichmentAdapter
+{
+ JSONObject enrich(String metadata);
+ boolean initializeAdapter();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseBolt.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseBolt.java
new file mode 100644
index 0000000..ef155f1
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseBolt.java
@@ -0,0 +1,118 @@
+package com.opensoc.hbase;
+
+
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import com.opensoc.helpers.topology.ErrorGenerator;
+
+/**
+ * A Storm bolt for putting data into HBase.
+ * <p>
+ * By default works in batch mode by enabling HBase's client-side write buffer. Enabling batch mode
+ * is recommended for high throughput, but it can be disabled in {@link TupleTableConfig}.
+ * <p>
+ * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
+ * classpath
+ * @see TupleTableConfig
+ * @see HTableConnector
+ */
+@SuppressWarnings("serial")
+public class HBaseBolt implements IRichBolt {
+ private static final Logger LOG = Logger.getLogger(HBaseBolt.class);
+
+ protected OutputCollector collector;
+ protected HTableConnector connector;
+ protected TupleTableConfig conf;
+ protected boolean autoAck = true;
+
+ private String _quorum;
+ private String _port;
+
+ public HBaseBolt(TupleTableConfig conf, String quorum, String port) {
+ this.conf = conf;
+ _quorum = quorum;
+ _port = port;
+
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("rawtypes")
+
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+
+ try {
+ this.connector = new HTableConnector(conf, _quorum, _port);
+
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ LOG.info("Preparing HBaseBolt for table: " + this.conf.getTableName());
+ }
+
+ /** {@inheritDoc} */
+
+ public void execute(Tuple input) {
+ try {
+ this.connector.getTable().put(conf.getPutFromTuple(input));
+ } catch (IOException ex) {
+
+ JSONObject error = ErrorGenerator.generateErrorMessage(
+ "Alerts problem: " + input.getBinary(0), ex);
+ collector.emit("error", new Values(error));
+
+ throw new RuntimeException(ex);
+ }
+
+ if (this.autoAck) {
+ this.collector.ack(input);
+ }
+ }
+
+ /** {@inheritDoc} */
+
+ public void cleanup() {
+ this.connector.close();
+ }
+
+ /** {@inheritDoc} */
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream("error", new Fields("HBase"));
+ }
+
+ /** {@inheritDoc} */
+
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ /**
+ * @return the autoAck
+ */
+ public boolean isAutoAck() {
+ return autoAck;
+ }
+
+ /**
+ * @param autoAck the autoAck to set
+ */
+ public void setAutoAck(boolean autoAck) {
+ this.autoAck = autoAck;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseStreamPartitioner.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseStreamPartitioner.java
new file mode 100644
index 0000000..4070db7
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseStreamPartitioner.java
@@ -0,0 +1,146 @@
+package com.opensoc.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HTable;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+
+public class HBaseStreamPartitioner implements CustomStreamGrouping {
+
+ private static final long serialVersionUID = -148324019395976092L;
+ private String[] regionStartKeys = { "0" };
+ private Map<String, String> regionStartKeyRegionNameMap = new HashMap<String, String>();
+
+ private List<Integer> targetTasks = null;
+ private int targetTasksSize = 0;
+ private int rowKeyFieldIndex = 0;
+ private String tableName = null;
+ private long regionCheckTime = 0;
+ private int regionInforRefreshIntervalInMins = 60;
+ private int regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
+
+ HTable hTable = null;;
+
+
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+
+ System.out.println("preparing HBaseStreamPartitioner for streamId " + stream.get_streamId());
+ this.targetTasks = targetTasks;
+ this.targetTasksSize = this.targetTasks.size();
+
+ Configuration conf = HBaseConfiguration.create();
+ try {
+ hTable = new HTable(conf, tableName);
+ refreshRegionInfo(tableName);
+
+ System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public void prepare() {
+
+ System.out.println("preparing HBaseStreamPartitioner for streamId " );
+
+ Configuration conf = HBaseConfiguration.create();
+ try {
+ hTable = new HTable(conf, tableName);
+ refreshRegionInfo(tableName);
+
+ System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public HBaseStreamPartitioner(String tableName, int rowKeyFieldIndex, int regionInforRefreshIntervalInMins) {
+ System.out.println("Created HBaseStreamPartitioner ");
+ this.rowKeyFieldIndex = rowKeyFieldIndex;
+ this.tableName = tableName;
+ this.regionInforRefreshIntervalInMins = regionInforRefreshIntervalInMins;
+ this.regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
+
+ }
+
+
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ List<Integer> choosenTasks = null;
+ System.out.println("Choosing task for taskId " + taskId + " and values " + values);
+
+ if (regionInforRefreshIntervalInMillis > (System.currentTimeMillis() - regionCheckTime)) {
+ try {
+ refreshRegionInfo(tableName);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ int regionIndex = getRegionIndex((String) values.get(rowKeyFieldIndex));
+
+ if (regionIndex < targetTasksSize) {
+ choosenTasks = Arrays.asList(regionIndex);
+
+ } else {
+ choosenTasks = Arrays.asList(regionIndex % targetTasksSize);
+ }
+ System.out.println("Choosen tasks are " + choosenTasks);
+
+ return choosenTasks;
+
+
+ }
+
+
+
+ public int getRegionIndex(String key) {
+ int index = Arrays.binarySearch(regionStartKeys, key);
+ if (index < -1) {
+ index = (index + 2) * -1;
+ } else if (index == -1) {
+ index = 0;
+ }
+
+ return index;
+ }
+
+ private void refreshRegionInfo(String tableName) throws IOException {
+
+ System.out.println("in refreshRegionInfo ");
+
+ Map<HRegionInfo, ServerName> regionMap = hTable.getRegionLocations();
+
+ synchronized (regionStartKeys) {
+ synchronized (regionStartKeyRegionNameMap) {
+ regionStartKeys = new String[regionMap.size()];
+ int index = 0;
+ String startKey = null;
+ regionStartKeyRegionNameMap.clear();
+ for (HRegionInfo regionInfo : regionMap.keySet()) {
+ startKey = new String(regionInfo.getStartKey());
+ regionStartKeyRegionNameMap.put(startKey, regionInfo.getRegionNameAsString());
+ regionStartKeys[index] = startKey;
+ index++;
+ }
+
+ Arrays.sort(regionStartKeys);
+ regionCheckTime = System.currentTimeMillis();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HTableConnector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HTableConnector.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HTableConnector.java
new file mode 100644
index 0000000..d0aa0b4
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HTableConnector.java
@@ -0,0 +1,106 @@
+package com.opensoc.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+import backtype.storm.generated.Bolt;
+
+/**
+ * HTable connector for Storm {@link Bolt}
+ * <p>
+ * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
+ * classpath
+ */
+@SuppressWarnings("serial")
+public class HTableConnector implements Serializable {
+ private static final Logger LOG = Logger.getLogger(HTableConnector.class);
+
+ private Configuration conf;
+ protected HTable table;
+ private String tableName;
+
+ /**
+ * Initialize HTable connection
+ * @param conf The {@link TupleTableConfig}
+ * @throws IOException
+ */
+ public HTableConnector(final TupleTableConfig conf, String _quorum, String _port) throws IOException {
+ this.tableName = conf.getTableName();
+ this.conf = HBaseConfiguration.create();
+
+ if(_quorum != null && _port != null)
+ {
+ this.conf.set("hbase.zookeeper.quorum", _quorum);
+ this.conf.set("hbase.zookeeper.property.clientPort", _port);
+ }
+
+ LOG.info(String.format("Initializing connection to HBase table %s at %s", tableName,
+ this.conf.get("hbase.rootdir")));
+
+ try {
+ this.table = new HTable(this.conf, this.tableName);
+ } catch (IOException ex) {
+ throw new IOException("Unable to establish connection to HBase table " + this.tableName, ex);
+ }
+
+ if (conf.isBatch()) {
+ // Enable client-side write buffer
+ this.table.setAutoFlush(false, true);
+ LOG.info("Enabled client-side write buffer");
+ }
+
+ // If set, override write buffer size
+ if (conf.getWriteBufferSize() > 0) {
+ try {
+ this.table.setWriteBufferSize(conf.getWriteBufferSize());
+
+ LOG.info("Setting client-side write buffer to " + conf.getWriteBufferSize());
+ } catch (IOException ex) {
+ LOG.error("Unable to set client-side write buffer size for HBase table " + this.tableName,
+ ex);
+ }
+ }
+
+ // Check the configured column families exist
+ for (String cf : conf.getColumnFamilies()) {
+ if (!columnFamilyExists(cf)) {
+ throw new RuntimeException(String.format(
+ "HBase table '%s' does not have column family '%s'", conf.getTableName(), cf));
+ }
+ }
+ }
+
+ /**
+ * Checks to see if table contains the given column family
+ * @param columnFamily The column family name
+ * @return boolean
+ * @throws IOException
+ */
+ private boolean columnFamilyExists(final String columnFamily) throws IOException {
+ return this.table.getTableDescriptor().hasFamily(Bytes.toBytes(columnFamily));
+ }
+
+ /**
+ * @return the table
+ */
+ public HTable getTable() {
+ return table;
+ }
+
+ /**
+ * Close the table
+ */
+ public void close() {
+ try {
+ this.table.close();
+ } catch (IOException ex) {
+ LOG.error("Unable to close connection to HBase table " + tableName, ex);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/TupleTableConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/TupleTableConfig.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/TupleTableConfig.java
new file mode 100644
index 0000000..71f8c9a
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/TupleTableConfig.java
@@ -0,0 +1,279 @@
+package com.opensoc.hbase;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Configuration for Storm {@link Tuple} to HBase serialization.
+ */
+@SuppressWarnings("serial")
+public class TupleTableConfig implements Serializable {
+
+ public static final long DEFAULT_INCREMENT = 1L;
+
+ private String tableName;
+ protected String tupleRowKeyField;
+ protected String tupleTimestampField;
+ protected Map<String, Set<String>> columnFamilies;
+ private boolean batch = true;
+ protected Durability durability = Durability.USE_DEFAULT;
+ private long writeBufferSize = 0L;
+
+ /**
+ * Initialize configuration
+ *
+ * @param table
+ * The HBase table name
+ * @param rowKeyField
+ * The {@link Tuple} field used to set the rowKey
+ */
+ public TupleTableConfig(final String table, final String rowKeyField) {
+ this.tableName = table;
+ this.tupleRowKeyField = rowKeyField;
+ this.tupleTimestampField = "";
+ this.columnFamilies = new HashMap<String, Set<String>>();
+ }
+
+ /**
+ * Initialize configuration
+ *
+ * @param table
+ * The HBase table name
+ * @param rowKeyField
+ * The {@link Tuple} field used to set the rowKey
+ * @param timestampField
+ * The {@link Tuple} field used to set the timestamp
+ */
+ public TupleTableConfig(final String table, final String rowKeyField, final String timestampField) {
+ this.tableName = table;
+ this.tupleRowKeyField = rowKeyField;
+ this.tupleTimestampField = timestampField;
+ this.columnFamilies = new HashMap<String, Set<String>>();
+ }
+
+ /**
+ * Add column family and column qualifier to be extracted from tuple
+ *
+ * @param columnFamily
+ * The column family name
+ * @param columnQualifier
+ * The column qualifier name
+ */
+ public void addColumn(final String columnFamily, final String columnQualifier) {
+ Set<String> columns = this.columnFamilies.get(columnFamily);
+
+ if (columns == null) {
+ columns = new HashSet<String>();
+ }
+ columns.add(columnQualifier);
+
+ this.columnFamilies.put(columnFamily, columns);
+ }
+
+ /**
+ * Creates a HBase {@link Put} from a Storm {@link Tuple}
+ *
+ * @param tuple
+ * The {@link Tuple}
+ * @return {@link Put}
+ */
+ public Put getPutFromTuple(final Tuple tuple) {
+ byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+
+ long ts = 0;
+ if (!tupleTimestampField.equals("")) {
+ ts = tuple.getLongByField(tupleTimestampField);
+ }
+
+ Put p = new Put(rowKey);
+
+ p.setDurability(durability);
+
+ if (columnFamilies.size() > 0) {
+ for (String cf : columnFamilies.keySet()) {
+ byte[] cfBytes = Bytes.toBytes(cf);
+ for (String cq : columnFamilies.get(cf)) {
+ byte[] cqBytes = Bytes.toBytes(cq);
+ byte[] val = tuple.getBinaryByField(cq);
+
+ if (ts > 0) {
+ p.add(cfBytes, cqBytes, ts, val);
+ } else {
+ p.add(cfBytes, cqBytes, val);
+ }
+ }
+ }
+ }
+
+ return p;
+ }
+
+ /**
+ * Creates a HBase {@link Increment} from a Storm {@link Tuple}
+ *
+ * @param tuple
+ * The {@link Tuple}
+ * @param increment
+ * The amount to increment the counter by
+ * @return {@link Increment}
+ */
+ public Increment getIncrementFromTuple(final Tuple tuple, final long increment) {
+ byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+
+ Increment inc = new Increment(rowKey);
+ inc.setDurability(durability);
+
+ if (columnFamilies.size() > 0) {
+ for (String cf : columnFamilies.keySet()) {
+ byte[] cfBytes = Bytes.toBytes(cf);
+ for (String cq : columnFamilies.get(cf)) {
+ byte[] val;
+ try {
+ val = Bytes.toBytes(tuple.getStringByField(cq));
+ } catch (IllegalArgumentException ex) {
+ // if cq isn't a tuple field, use cq for counter instead of tuple
+ // value
+ val = Bytes.toBytes(cq);
+ }
+ inc.addColumn(cfBytes, val, increment);
+ }
+ }
+ }
+
+ return inc;
+ }
+
+ /**
+ * Increment the counter for the given family and column by the specified
+ * amount
+ * <p>
+ * If the family and column already exist in the Increment the counter value
+ * is incremented by the specified amount rather than overridden, as it is in
+ * HBase's {@link Increment#addColumn(byte[], byte[], long)} method
+ *
+ * @param inc
+ * The {@link Increment} to update
+ * @param family
+ * The column family
+ * @param qualifier
+ * The column qualifier
+ * @param amount
+ * The amount to increment the counter by
+ */
+ public static void addIncrement(Increment inc, final byte[] family, final byte[] qualifier, final Long amount) {
+
+ NavigableMap<byte[], Long> set = inc.getFamilyMapOfLongs().get(family);
+ if (set == null) {
+ set = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ }
+
+ // If qualifier exists, increment amount
+ Long counter = set.get(qualifier);
+ if (counter == null) {
+ counter = 0L;
+ }
+ set.put(qualifier, amount + counter);
+
+ inc.getFamilyMapOfLongs().put(family, set);
+ }
+
+ /**
+ * @return the tableName
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * @return Whether batch mode is enabled
+ */
+ public boolean isBatch() {
+ return batch;
+ }
+
+ /**
+ * @param batch
+ * Whether to enable HBase's client-side write buffer.
+ * <p>
+ * When enabled your bolt will store put operations locally until the
+ * write buffer is full, so they can be sent to HBase in a single RPC
+ * call. When disabled each put operation is effectively an RPC and
+ * is sent straight to HBase. As your bolt can process thousands of
+ * values per second it is recommended that the write buffer is
+ * enabled.
+ * <p>
+ * Enabled by default
+ */
+ public void setBatch(boolean batch) {
+ this.batch = batch;
+ }
+
+ /**
+ * @param setDurability
+ * Sets whether to write to HBase's edit log.
+ * <p>
+ * Setting to false will mean fewer operations to perform when
+ * writing to HBase and hence better performance, but changes that
+ * haven't been flushed to a store file will be lost in the event of
+ * HBase failure
+ * <p>
+ * Enabled by default
+ */
+ public void setDurability(Durability durability) {
+ this.durability = durability;
+ }
+
+
+ public Durability getDurability() {
+ return durability;
+ }
+
+ /**
+ * @param writeBufferSize
+ * Overrides the client-side write buffer size.
+ * <p>
+ * By default the write buffer size is 2 MB (2097152 bytes). If you
+ * are storing larger data, you may want to consider increasing this
+ * value to allow your bolt to efficiently group together a larger
+ * number of records per RPC
+ * <p>
+ * Overrides the write buffer size you have set in your
+ * hbase-site.xml e.g. <code>hbase.client.write.buffer</code>
+ */
+ public void setWriteBufferSize(long writeBufferSize) {
+ this.writeBufferSize = writeBufferSize;
+ }
+
+ /**
+ * @return the writeBufferSize
+ */
+ public long getWriteBufferSize() {
+ return writeBufferSize;
+ }
+
+ /**
+ * @return A Set of configured column families
+ */
+ public Set<String> getColumnFamilies() {
+ return this.columnFamilies.keySet();
+ }
+
+ /**
+ * @return the tupleRowKeyField
+ */
+ public String getTupleRowKeyField() {
+ return tupleRowKeyField;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/services/PcapServiceCli.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/services/PcapServiceCli.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/services/PcapServiceCli.java
new file mode 100644
index 0000000..70f8683
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/services/PcapServiceCli.java
@@ -0,0 +1,110 @@
+package com.opensoc.helpers.services;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+public class PcapServiceCli {
+
+ private String[] args = null;
+ private Options options = new Options();
+
+ int port = 8081;
+ String uri = "/pcapGetter";
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public PcapServiceCli(String[] args) {
+
+ this.args = args;
+
+ Option help = new Option("h", "Display help menue");
+ options.addOption(help);
+ options.addOption(
+ "port",
+ true,
+ "OPTIONAL ARGUMENT [portnumber] If this argument sets the port for starting the service. If this argument is not set the port will start on defaut port 8081");
+ options.addOption(
+ "endpoint_uri",
+ true,
+ "OPTIONAL ARGUMENT [/uri/to/service] This sets the URI for the service to be hosted. The default URI is /pcapGetter");
+ }
+
+ public void parse() {
+ CommandLineParser parser = new BasicParser();
+
+ CommandLine cmd = null;
+
+ try {
+ cmd = parser.parse(options, args);
+ } catch (ParseException e1) {
+
+ e1.printStackTrace();
+ }
+
+ if (cmd.hasOption("h"))
+ help();
+
+ if (cmd.hasOption("port")) {
+
+ try {
+ port = Integer.parseInt(cmd.getOptionValue("port").trim());
+ } catch (Exception e) {
+
+ System.out.println("[OpenSOC] Invalid value for port entered");
+ help();
+ }
+ }
+ if (cmd.hasOption("endpoint_uri")) {
+
+ try {
+
+ if (uri == null || uri.equals(""))
+ throw new Exception("invalid uri");
+
+ uri = cmd.getOptionValue("uri").trim();
+
+ if (uri.charAt(0) != '/')
+ uri = "/" + uri;
+
+ if (uri.charAt(uri.length()) == '/')
+ uri = uri.substring(0, uri.length() - 1);
+
+ } catch (Exception e) {
+ System.out.println("[OpenSOC] Invalid URI entered");
+ help();
+ }
+ }
+
+ }
+
+ private void help() {
+ // This prints out some help
+ HelpFormatter formater = new HelpFormatter();
+
+ formater.printHelp("Topology Options:", options);
+
+ // System.out
+ // .println("[OpenSOC] Example usage: \n storm jar OpenSOC-Topologies-0.3BETA-SNAPSHOT.jar com.opensoc.topology.Bro -local_mode true -config_path OpenSOC_Configs/ -generator_spout true");
+
+ System.exit(0);
+ }
+}