You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by si...@apache.org on 2016/01/14 18:03:27 UTC

[32/85] [partial] incubator-metron git commit: Rename all OpenSOC files to Metron

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-PlatformScripts/WhoisEnrichment/Whois_CSV_to_JSON.py
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-PlatformScripts/WhoisEnrichment/Whois_CSV_to_JSON.py b/metron-streaming/Metron-PlatformScripts/WhoisEnrichment/Whois_CSV_to_JSON.py
new file mode 100755
index 0000000..2091418
--- /dev/null
+++ b/metron-streaming/Metron-PlatformScripts/WhoisEnrichment/Whois_CSV_to_JSON.py
@@ -0,0 +1,208 @@
+#!/usr/bin/python
+
+"""
+Copyright 2014 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import sys
+import os
+import csv
+import json
+import multiprocessing
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+
+def is_field_excluded(fieldname=None):
+    """
+    Checks to see if a field name is a member of a list of names to exclude. Modify to suit your own list.
+
+    :param fieldname: A string representing a field name
+    :return: True or False
+    """
+    import re
+
+    # List of fields names to exclude
+    excluded_fields = [
+        'Audit_auditUpdatedDate',
+        #'domainName'
+    ]
+
+    if fieldname in excluded_fields:
+        return True
+
+    # Regexes to match for exclusion
+    excluded_regexes = [
+        ['_rawText$', re.IGNORECASE],
+    ]
+
+    for regex in excluded_regexes:
+        if re.search(regex[0], fieldname, regex[1]):
+            return True
+
+    return False
+
+
+def process_csv(in_filename, out_filename):
+    """
+    Processes a CSV file of WHOIS data and converts each line to a JSON element, skipping specific fields that
+    are not deemed necessary (domainName, *_rawText, Audit_auditUpdatedDate)
+
+    :param in_filename: Input CSV filename with full path
+    :param out_filename: Output JSON filename with full path
+    :return: None
+    """
+    if out_filename:
+        out_fh = open(out_filename, 'wb')
+        logging.debug('%s: Converting %s to %s' % (multiprocessing.current_process().name, in_filename, out_filename))
+    else:
+        logging.debug('%s: Analyzing %s' % (multiprocessing.current_process().name, in_filename))
+
+    with open(in_filename, 'rb') as f:
+        reader = csv.DictReader(f, delimiter=',', quotechar='"')
+        line_num = 0
+        try:
+            for row in reader:
+                line_num += 1
+                try:
+                    if out_filename:
+                        # json conversion and output
+                        new_row = {}
+                        for field in reader.fieldnames:
+                            # fields we don't want include these + anything with rawText
+                            #if field not in ['Audit_auditUpdatedDate', 'domainName'] and not field.endswith('_rawText'):
+                            if not is_field_excluded(field):
+                                new_row[field] = row.get(field)
+                        json.dump(new_row, out_fh)
+                        out_fh.write('\n')
+                    else:
+                        # analysis .. check to be sure fileheader and csv row counts match
+                        if len(row) != len(reader.fieldnames):
+                            raise Exception('Field count mismatch: row: %s / fields: %s' % (len(row), len(reader.fieldnames)))
+                except Exception, e:
+                    logging.warn("Error with file %s, line %s: %s" % (in_filename, line_num, e))
+
+            if not out_filename:
+                logging.info('Analyzed %s: OK' % in_filename)
+        except Exception, e:
+            logging.warn(e)
+
+        out_fh.close()
+
+
+##-------------------------------------------------------------------------
+
+def process_files(source_dir, output_dir, max_processes=10, overwrite=False):
+    """
+    Generates a multiprocessing.Pool() queue with a list of input and output files to be processed with processCSV.
+    Files are added by walking the source_dir and adding any file with a CSV extension. Output is placed into a single
+    directory for processing. Output filenames are generated using the first part of the directory name so a file
+    named source_dir/com/1.csv would become outputDir/com_1.json
+
+    :param source_dir: Source directory of CSV files
+    :param output_dir: Output directory for resultant JSON files
+    :param max_processes: Maximum number of processes run
+    :return:
+    """
+    logging.info("Processing Whois files from %s" % source_dir)
+
+    if output_dir and not os.path.exists(output_dir):
+        logging.debug("Creating output directory %s" % output_dir)
+        os.makedirs(output_dir)
+
+    logging.info("Starting %s pool workers" % max_processes)
+
+    if sys.version.startswith('2.6'):
+        # no maxtaskperchild in 2.6
+        pool = multiprocessing.Pool(processes=max_processes)
+    else:
+        pool = multiprocessing.Pool(processes=max_processes, maxtasksperchild=4)
+
+    filecount = 0
+    for dirname, dirnames, filenames in os.walk(source_dir):
+        for filename in filenames:
+            if filename.endswith('.csv'):
+                # output files go to outputDir and are named using the last subdirectory from the dirname
+                if output_dir:
+                    out_filename = filename.replace('csv', 'json')
+                    out_filename = os.path.join(output_dir, '%s_%s' % (os.path.split(dirname)[-1], out_filename))
+
+                    # if file does not exist or if overwrite is true, add file process to the pool
+                    if not os.path.isfile(out_filename) or overwrite:
+                        pool.apply_async(process_csv, args=(os.path.join(dirname, filename), out_filename))
+                        filecount += 1
+                    else:
+                        logging.info("Skipping %s, %s exists and overwrite is false" % (filename, out_filename))
+                else:
+                    # no outputdir so we just analyze the files
+                    pool.apply_async(process_csv, args=(os.path.join(dirname, filename), None))
+                    filecount += 1
+
+    try:
+        pool.close()
+        logging.info("Starting activities on %s CSV files" % filecount)
+        pool.join()
+    except KeyboardInterrupt:
+        logging.info("Aborting")
+        pool.terminate()
+
+    logging.info("Completed")
+
+
+##-------------------------------------------------------------------------
+
+if __name__ == "__main__":
+
+    max_cpu = multiprocessing.cpu_count()
+
+    from optparse import OptionParser
+    parser = OptionParser()
+    parser.add_option('-s', '--source', dest='source_dir', action='store',
+                      help='Source directory to walk for CSV files')
+    parser.add_option('-o', '--output', dest='out_dir', action='store',
+                      help='Output directory for JSON files')
+    parser.add_option('-O', '--overwrite', dest='overwrite', action='store_true',
+                      help='Overwrite existing files in output directory')
+    parser.add_option('-p', '--processes', dest='max_processes', action='store', default=max_cpu, type='int',
+                      help='Max number of processes to spawn')
+    parser.add_option('-a', '--analyze', dest='analyze', action='store_true',
+                      help='Analyze CSV files for validity, no file output')
+    parser.add_option('-d', '--debug', dest='debug', action='store_true',
+                      help='Enable debug messages')
+
+    (options, args) = parser.parse_args()
+
+    if not options.source_dir:
+        logging.error("Source directory required")
+        sys.exit(-1)
+
+    if not options.out_dir or options.analyze:
+        out_dir = None
+    elif not options.out_dir:
+        logging.error("Ouput directory or analysis option required")
+        sys.exit(-1)
+    else:
+        out_dir = options.out_dir
+
+    if options.max_processes > max_cpu:
+        logging.warn('Max Processes (%s) is greater than available Processors (%s)' % (options.max_processes, max_cpu))
+
+    if options.debug:
+        # enable debug level and multiprocessing debugging
+        logging.basicConfig(level=logging.DEBUG)
+        multiprocessing.log_to_stderr(logging.DEBUG)
+
+    process_files(options.source_dir, options.out_dir, options.max_processes, options.overwrite)
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/pom.xml b/metron-streaming/Metron-Topologies/pom.xml
new file mode 100644
index 0000000..3ec016f
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/pom.xml
@@ -0,0 +1,188 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+	the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.6BETA</version>
+	</parent>
+	<artifactId>OpenSOC-Topologies</artifactId>
+	<description>OpenSOC Topologies</description>
+
+	<properties>
+		<storm.hdfs.version>0.9.1.2.1.1.0-385</storm.hdfs.version>
+		<cli.version>20040117.000000</cli.version>
+		<commons.config.version>1.10</commons.config.version>
+	</properties>
+	<repositories>
+		<repository>
+			<id>github-snapshots</id>
+			<url>http://oss.sonatype.org/content/repositories/snapshots/</url>
+		</repository>
+	</repositories>
+	<dependencies>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Common</artifactId>
+			<version>${project.parent.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Alerts</artifactId>
+			<version>${project.parent.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-EnrichmentAdapters</artifactId>
+			<version>${project.parent.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-MessageParsers</artifactId>
+			<version>${project.parent.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Indexing</artifactId>
+			<version>${project.parent.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-client</artifactId>
+			<version>${global_hadoop_version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<version>${global_hadoop_version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.8.2</artifactId>
+			<version>${global_kafka_version}</version>
+			<exclusions>
+				<!--exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> 
+					</exclusion -->
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${global_storm_version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-kafka</artifactId>
+			<version>${global_storm_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm</artifactId>
+			<version>${global_storm_version}</version>
+			<type>pom</type>
+			<scope>provided</scope>
+		</dependency>
+		
+
+		<dependency>
+			<groupId>com.github.ptgoetz</groupId>
+			<artifactId>storm-hbase</artifactId>
+			<version>0.1.2</version>
+		</dependency>
+		<!-- dependency> <groupId>com.github.ptgoetz</groupId> <artifactId>storm-hdfs</artifactId> 
+			<version>0.1.2</version> </dependency -->
+		<dependency>
+			<groupId>com.github.sheetaldolas</groupId>
+			<artifactId>storm-hdfs</artifactId>
+			<version>0.0.7-SNAPSHOT</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.storm</groupId>
+					<artifactId>storm-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-client</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+	</dependencies>
+	<build>
+		<resources>
+			<resource>
+				<directory>src/main/resources</directory>
+			</resource>
+		</resources>
+		<plugins>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>1.4</version>
+				<configuration>
+					<createDependencyReducedPom>true</createDependencyReducedPom>
+				</configuration>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<excludes>
+									<exclude>storm:storm-core:*</exclude>
+									<exclude>storm:storm-lib:*</exclude>
+									<exclude>*slf4j*</exclude>
+								</excludes>
+							</artifactSet>
+
+							<transformers>
+								<transformer
+									implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+									<resource>.yaml</resource>
+								</transformer>
+								<transformer
+									implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+								<transformer
+									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass></mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/pom.xml.versionsBackup
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/pom.xml.versionsBackup b/metron-streaming/Metron-Topologies/pom.xml.versionsBackup
new file mode 100644
index 0000000..98ada2d
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/pom.xml.versionsBackup
@@ -0,0 +1,190 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+	the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+	<artifactId>OpenSOC-Topologies</artifactId>
+	<description>OpenSOC Topologies</description>
+
+	<properties>
+		<opensoc.common.version>0.0.1-SNAPSHOT</opensoc.common.version>
+		<opensoc.alerts.version>0.0.1-SNAPSHOT</opensoc.alerts.version>
+		<enrichment.geo.version>0.0.1-SNAPSHOT</enrichment.geo.version>
+		<enrichment.whois.version>0.0.1-SNAPSHOT</enrichment.whois.version>
+		<geo.enrichment.version>0.0.1-SNAPSHOT</geo.enrichment.version>
+		<parsers.version>0.0.1-SNAPSHOT</parsers.version>
+		<indexing.version>0.0.1-SNAPSHOT</indexing.version>
+		<storm.version>0.9.2-incubating</storm.version>
+		<storm.hdfs.version>0.9.1.2.1.1.0-385</storm.hdfs.version>
+		<hadoop.version>2.2.0</hadoop.version>
+		<cli.version>20040117.000000</cli.version>
+		<kafka.storm.version>0.9.2-incubating</kafka.storm.version>
+		<cif.enrichment.version>0.0.1-SNAPSHOT</cif.enrichment.version>
+		<lancope.enrichment.version>0.0.1-SNAPSHOT</lancope.enrichment.version>
+		<commons.config.version>1.10</commons.config.version>
+	</properties>
+	<dependencies>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Common</artifactId>
+			<version>${opensoc.common.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Alerts</artifactId>
+			<version>${opensoc.alerts.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-EnrichmentAdapters</artifactId>
+			<version>${enrichment.geo.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-MessageParsers</artifactId>
+			<version>${parsers.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Indexing</artifactId>
+			<version>${indexing.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-client</artifactId>
+			<version>${hadoop.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<version>${hadoop.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.8.2</artifactId>
+			<version>0.8.1</version>
+			<exclusions>
+				<!--exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> 
+					</exclusion -->
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${storm.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-kafka</artifactId>
+			<version>${storm.version}</version>
+
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm</artifactId>
+			<version>${storm.version}</version>
+			<type>pom</type>
+			<scope>provided</scope>
+		</dependency>
+<dependency>
+    <groupId>com.github.ptgoetz</groupId>
+    <artifactId>storm-hbase</artifactId>
+    <version>0.1.2</version>
+</dependency>
+		<dependency>
+			<groupId>com.github.ptgoetz</groupId>
+			<artifactId>storm-hdfs</artifactId>
+			<version>0.1.2</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-configuration</groupId>
+			<artifactId>commons-configuration</artifactId>
+			<version>${commons.config.version}</version>
+		</dependency>
+  		<dependency>
+  			<groupId>junit</groupId>
+  			<artifactId>junit</artifactId>
+  			<version>3.8.2</version>
+  		</dependency>		
+	</dependencies>
+	<build>
+		<resources>
+			<resource>
+				<directory>src/main/resources</directory>
+			</resource>
+		</resources>
+		<plugins>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>1.4</version>
+				<configuration>
+					<createDependencyReducedPom>true</createDependencyReducedPom>
+				</configuration>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<excludes>
+									<exclude>storm:storm-core:*</exclude>
+									<exclude>storm:storm-lib:*</exclude>
+									<exclude>*slf4j*</exclude>
+								</excludes>
+							</artifactSet>
+
+							<transformers>
+								<transformer
+									implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+									<resource>.yaml</resource>
+								</transformer>
+								<transformer
+									implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+								<transformer
+									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass></mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/readme.md
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/readme.md b/metron-streaming/Metron-Topologies/readme.md
new file mode 100644
index 0000000..feac62d
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/readme.md
@@ -0,0 +1,47 @@
+#OpenSOC-Topologies
+
+#Module Description
+
+This module provides example topologies that show how to drive OpenSOC modules and components.  The sample topologies provided are to process PCAP, Ise, Lancope, and Bro telemetries
+
+##Launching Topologies
+
+
+```
+
+storm jar OpenSOC-Topologies-0.6BETA.jar com.opensoc.topology.Pcap
+storm jar OpenSOC-Topologies-0.6BETA.jar com.opensoc.topology.Sourcefire
+storm jar OpenSOC-Topologies-0.6BETA.jar com.opensoc.topology.Lancope
+storm jar OpenSOC-Topologies-0.6BETA.jar com.opensoc.topology.Ise
+
+Topology Options:
+-config_path <arg>       OPTIONAL ARGUMENT [/path/to/configs] Path to
+configuration folder. If not provided topology
+will initialize with default configs
+-debug <arg>             OPTIONAL ARGUMENT [true|false] Storm debugging
+enabled.  Default value is true
+-generator_spout <arg>   REQUIRED ARGUMENT [true|false] Turn on test
+generator spout.  Default is set to false.  If
+test generator spout is turned on then kafka
+spout is turned off.  Instead the generator
+spout will read telemetry from file and ingest
+it into a topology
+-h                       Display help menue
+-local_mode <arg>        REQUIRED ARGUMENT [true|false] Local mode or
+cluster mode.  If set to true the topology will
+run in local mode.  If set to false the topology
+will be deployed to Storm nimbus
+```
+
+##Topology Configs
+
+The sample topologies provided use a specific directory structure.  The example directory structure was checked in here:
+
+```
+https://github.com/OpenSOC/opensoc-streaming/tree/master/OpenSOC-Topologies/src/main/resources/OpenSOC_Configs
+```
+
+topology.conf - settings specific to each topology
+features_enabled.conf - turn on and off features for each topology and control parallelism
+metrics.conf - export definitions for metrics to Graphite 
+topology_dentifier.conf - customer-specific tag (since we deploy to multiple data centers we need to identify where the alerts are coming from and what topologies we are looking at when we need to debug)

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/bolts/PrintingBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/bolts/PrintingBolt.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/bolts/PrintingBolt.java
new file mode 100644
index 0000000..4cd3a68
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/bolts/PrintingBolt.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.test.bolts;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+
+@SuppressWarnings("serial")
+public class PrintingBolt extends BaseRichBolt {
+
+	@SuppressWarnings("rawtypes")
+	public void prepare(Map stormConf, TopologyContext context,
+			OutputCollector collector) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	public void execute(Tuple input) {
+		System.out.println("---------[RECEIVED] " + input);
+		
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		// TODO Auto-generated method stub
+		
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/filereaders/FileReader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/filereaders/FileReader.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/filereaders/FileReader.java
new file mode 100644
index 0000000..ee390b5
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/filereaders/FileReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.test.filereaders;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.LinkedList;
+import java.util.List;
+
+public class FileReader {
+	public List<String> readFromFile(String filename) throws IOException 
+	{
+		
+		System.out.println("Reading stream from " + filename);
+
+		List<String> lines = new LinkedList<String>();
+
+		InputStream stream = Thread.currentThread().getContextClassLoader()
+				.getResourceAsStream(filename);
+
+		DataInputStream in = new DataInputStream(stream);
+		BufferedReader br = new BufferedReader(new InputStreamReader(in));
+		String strLine;
+		while ((strLine = br.readLine()) != null) 
+		{
+			//System.out.println("-----------------I READ: " + strLine);
+			lines.add(strLine);
+		}
+		//br.close();
+
+		return lines;
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/spouts/GenericInternalTestSpout.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/spouts/GenericInternalTestSpout.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/spouts/GenericInternalTestSpout.java
new file mode 100644
index 0000000..ced5266
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/spouts/GenericInternalTestSpout.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.test.spouts;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import com.opensoc.test.filereaders.FileReader;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+
+public class GenericInternalTestSpout extends BaseRichSpout {
+
+	
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -2379344923143372543L;
+
+	List<String> jsons;
+	
+	private String _filename;
+	private int _delay = 100;
+	private boolean _repeating = true;
+	
+	private SpoutOutputCollector _collector;
+	private FileReader Reader;
+	private int cnt = 0;
+	
+	public GenericInternalTestSpout withFilename(String filename)
+	{
+		_filename = filename;
+		return this;
+	}
+	public GenericInternalTestSpout withMilisecondDelay(int delay)
+	{
+		_delay = delay;
+		return this;
+	}
+	
+	public GenericInternalTestSpout withRepeating(boolean repeating)
+	{
+		_repeating = repeating;
+		return this;
+	}
+
+
+	@SuppressWarnings("rawtypes") 
+	public void open(Map conf, TopologyContext context,
+			SpoutOutputCollector collector) {
+		
+		_collector = collector;
+		try {
+			Reader =  new FileReader();
+			jsons = Reader.readFromFile(_filename);
+
+			
+		} catch (IOException e) 
+		{
+			System.out.println("Could not read sample JSONs");
+			e.printStackTrace();
+		}
+		
+	}
+
+	public void nextTuple() {
+		Utils.sleep(_delay);
+		
+		if(cnt < jsons.size())
+		{
+			_collector.emit(new Values(jsons.get(cnt).getBytes()));
+		}
+		cnt ++;
+		
+		if(_repeating && cnt == jsons.size() -1 )
+			cnt = 0;
+	}
+
+	@Override
+	public void ack(Object id) {
+	}
+
+	@Override
+	public void fail(Object id) {
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("message"));
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/spouts/PcapSimulatorSpout.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/spouts/PcapSimulatorSpout.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/spouts/PcapSimulatorSpout.java
new file mode 100644
index 0000000..bf53914
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/test/spouts/PcapSimulatorSpout.java
@@ -0,0 +1,153 @@
+package com.opensoc.test.spouts;
+
+import java.util.Map;
+import java.util.Random;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import com.opensoc.pcap.PcapUtils;
+
+
+/**
+ * The Class PcapSimulatorSpout.
+ */
+public class PcapSimulatorSpout extends BaseRichSpout {
+
+  /** The Constant serialVersionUID. */
+  private static final long serialVersionUID = -5878104600899840638L;
+
+  /** The collector. */
+  private SpoutOutputCollector collector = null;
+
+  /** The Constant randomIpSegmentGenerator. */
+  private static final Random randomIpSegmentGenerator = new Random(255);
+
+  /** The Constant randomPortGenerator. */
+  private static final Random randomPortGenerator = new Random(64000);
+
+  /** The Constant randomJsonGenerator. */
+  private static final Random randomJsonGenerator = new Random(8);
+
+  /** The Constant randomProtocolGenerator. */
+  private static final Random randomProtocolGenerator = new Random(255);
+
+  /** The message size. */
+  private static int messageSize = 30000;
+
+  /** The pcap. */
+  private static byte[] pcap = new byte[messageSize];
+
+  /** The Constant randomPcapGenerator. */
+  private static final Random randomPcapGenerator = new Random();
+
+  /** The json doc. */
+  private static String jsonDoc;
+
+  /** The ts. */
+  private static long ts;
+
+  /** The group key. */
+  private static String groupKey;
+
+  /** The ip addr. */
+  StringBuffer ipAddr = new StringBuffer();
+
+  /** The Constant jsonDocs. */
+  private static final String[] jsonDocs = {
+      "{ \"header\": { \"IncLen\": 124,\"OrigLen\": 124,\"TsSec\": 1391740061,\"TsUsec\": 723610},\"ipv4header\": { \"Destination\": -1407317716,\"DestinationAddress\": \"172.30.9.44\",\"Flags\": 2,\"FragmentOffset\": 0,\"HeaderChecksum\": 22550,\"Id\": 30686,\"Ihl\": 20,\"Protocol\": 6,\"Source\": -1407317715,\"SourceAddress\": \"172.30.9.45\",\"Tos\": 0,\"TotalLength\": 110,\"Ttl\": 64,\"Version\": 4},\"tcpheader\": { \"Ack\": 1331776820,\"Checksum\": 21822,\"DataLength\": 58,\"DataOffset\": 8,\"DestinationAddress\": \"172.30.9.44\",\"DestinationPort\": 9092,\"Direction\": null,\"Flags\": 24,\"ReassembledLength \": 0,\"RelativeAck\": 0,\"RelativeSeq\": 0,\"Seq\": 337331842,\"SessionKey\": \"172.30.9.45:56412 -> 172.30.9.44:9092\",\"SourceAddress\": \"172.30.9.45\",\"SourcePort\": 56412,\"TotalLength\": 90,\"UrgentPointer\": 0,\"Window\": 115}}",
+      "{ \"header\": { \"IncLen\": 60,\"OrigLen\": 60,\"TsSec\": 1391743533,\"TsUsec\": 523808},\"ipv4header\": { \"Destination\": 202,\"DestinationAddress\": \"0.0.0.202\",\"Flags\": 0,\"FragmentOffset\": 572,\"HeaderChecksum\": 21631,\"Id\": 2,\"Ihl\": 8,\"Protocol\": 0,\"Source\": -285366020,\"SourceAddress\": \"238.253.168.252\",\"Tos\": 66,\"TotalLength\": 768,\"Ttl\": 128,\"Version\": 4}} ",
+      "{ \"header\": { \"IncLen\": 64,\"OrigLen\": 64,\"TsSec\": 1391729466,\"TsUsec\": 626286},\"ipv4header\": { \"Destination\": -55296,\"DestinationAddress\": \"255.255.40.0\",\"Flags\": 0,\"FragmentOffset\": 0,\"HeaderChecksum\": 28302,\"Id\": 62546,\"Ihl\": 0,\"Protocol\": 0,\"Source\": 151031295,\"SourceAddress\": \"9.0.141.255\",\"Tos\": 60,\"TotalLength\": 14875,\"Ttl\": 0,\"Version\": 0}}",
+      "{ \"header\": { \"IncLen\": 64,\"OrigLen\": 64,\"TsSec\": 1391729470,\"TsUsec\": 404175},\"ipv4header\": { \"Destination\": -55296,\"DestinationAddress\": \"255.255.40.0\",\"Flags\": 0,\"FragmentOffset\": 0,\"HeaderChecksum\": 53034,\"Id\": 62546,\"Ihl\": 0,\"Protocol\": 0,\"Source\": 100699647,\"SourceAddress\": \"6.0.141.255\",\"Tos\": 60,\"TotalLength\": 15899,\"Ttl\": 0,\"Version\": 0}}",
+      "{ \"header\": { \"IncLen\": 64,\"OrigLen\": 64,\"TsSec\": 1391729470,\"TsUsec\": 991207},\"ipv4header\": { \"Destination\": -55296,\"DestinationAddress\": \"255.255.40.0\",\"Flags\": 0,\"FragmentOffset\": 0,\"HeaderChecksum\": 59167,\"Id\": 62546,\"Ihl\": 0,\"Protocol\": 0,\"Source\": 251694591,\"SourceAddress\": \"15.0.141.255\",\"Tos\": 60,\"TotalLength\": 15899,\"Ttl\": 0,\"Version\": 0}}",
+      "{ \"header\": { \"IncLen\": 78,\"OrigLen\": 78,\"TsSec\": 1391743531,\"TsUsec\": 617746},\"ipv4header\": { \"Destination\": -1407317706,\"DestinationAddress\": \"172.30.9.54\",\"Flags\": 2,\"FragmentOffset\": 0,\"HeaderChecksum\": 12015,\"Id\": 41253,\"Ihl\": 20,\"Protocol\": 6,\"Source\": -1407317711,\"SourceAddress\": \"172.30.9.49\",\"Tos\": 0,\"TotalLength\": 64,\"Ttl\": 64,\"Version\": 4},\"tcpheader\": { \"Ack\": -854627611,\"Checksum\": 28439,\"DataLength\": 12,\"DataOffset\": 8,\"DestinationAddress\": \"172.30.9.54\",\"DestinationPort\": 43457,\"Direction\": null,\"Flags\": 24,\"ReassembledLength \": 0,\"RelativeAck\": 0,\"RelativeSeq\": 0,\"Seq\": -70750910,\"SessionKey\": \"172.30.9.49:9092 -> 172.30.9.54:43457\",\"SourceAddress\": \"172.30.9.49\",\"SourcePort\": 9092,\"TotalLength\": 44,\"UrgentPointer\": 0,\"Window\": 1453}}",
+      "{ \"header\": { \"IncLen\": 78,\"OrigLen\": 78,\"TsSec\": 1391743532,\"TsUsec\": 78633},\"ipv4header\": { \"Destination\": -1407317706,\"DestinationAddress\": \"172.30.9.54\",\"Flags\": 2,\"FragmentOffset\": 0,\"HeaderChecksum\": 26235,\"Id\": 27034,\"Ihl\": 20,\"Protocol\": 6,\"Source\": -1407317712,\"SourceAddress\": \"172.30.9.48\",\"Tos\": 0,\"TotalLength\": 64,\"Ttl\": 64,\"Version\": 4},\"tcpheader\": { \"Ack\": 965354559,\"Checksum\": 6890,\"DataLength\": 12,\"DataOffset\": 8,\"DestinationAddress\": \"172.30.9.54\",\"DestinationPort\": 37051,\"Direction\": null,\"Flags\": 24,\"ReassembledLength \": 0,\"RelativeAck\": 0,\"RelativeSeq\": 0,\"Seq\": -1654276327,\"SessionKey\": \"172.30.9.48:9092 -> 172.30.9.54:37051\",\"SourceAddress\": \"172.30.9.48\",\"SourcePort\": 9092,\"TotalLength\": 44,\"UrgentPointer\": 0,\"Window\": 1453}}",
+      "{ \"header\": { \"IncLen\": 78,\"OrigLen\": 78,\"TsSec\": 1391743634,\"TsUsec\": 784540},\"ipv4header\": { \"Destination\": -1407317710,\"DestinationAddress\": \"172.30.9.50\",\"Flags\": 2,\"FragmentOffset\": 0,\"HeaderChecksum\": 46490,\"Id\": 6784,\"Ihl\": 20,\"Protocol\": 6,\"Source\": -1407317713,\"SourceAddress\": \"172.30.9.47\",\"Tos\": 0,\"TotalLength\": 64,\"Ttl\": 64,\"Version\": 4},\"tcpheader\": { \"Ack\": -477288801,\"Checksum\": 60687,\"DataLength\": 12,\"DataOffset\": 8,\"DestinationAddress\": \"172.30.9.50\",\"DestinationPort\": 53561,\"Direction\": null,\"Flags\": 24,\"ReassembledLength \": 0,\"RelativeAck\": 0,\"RelativeSeq\": 0,\"Seq\": -1890443606,\"SessionKey\": \"172.30.9.47:9092 -> 172.30.9.50:53561\",\"SourceAddress\": \"172.30.9.47\",\"SourcePort\": 9092,\"TotalLength\": 44,\"UrgentPointer\": 0,\"Window\": 1453}}",
+      "{ \"header\": { \"IncLen\": 78,\"OrigLen\": 78,\"TsSec\": 1391743683,\"TsUsec\": 495234},\"ipv4header\": { \"Destination\": -1407317711,\"DestinationAddress\": \"172.30.9.49\",\"Flags\": 2,\"FragmentOffset\": 0,\"HeaderChecksum\": 48322,\"Id\": 4956,\"Ihl\": 20,\"Protocol\": 6,\"Source\": -1407317716,\"SourceAddress\": \"172.30.9.44\",\"Tos\": 0,\"TotalLength\": 64,\"Ttl\": 64,\"Version\": 4},\"tcpheader\": { \"Ack\": -1825947455,\"Checksum\": 27340,\"DataLength\": 12,\"DataOffset\": 8,\"DestinationAddress\": \"172.30.9.49\",\"DestinationPort\": 37738,\"Direction\": null,\"Flags\": 24,\"ReassembledLength \": 0,\"RelativeAck\": 0,\"RelativeSeq\": 0,\"Seq\": -496700614,\"SessionKey\": \"172.30.9.44:9092 -> 172.30.9.49:37738\",\"SourceAddress\": \"172.30.9.44\",\"SourcePort\": 9092,\"TotalLength\": 44,\"UrgentPointer\": 0,\"Window\": 1453}}",
+      "{ \"header\": { \"IncLen\": 78,\"OrigLen\": 78,\"TsSec\": 1391743772,\"TsUsec\": 719493},\"ipv4header\": { \"Destination\": -1407317715,\"DestinationAddress\": \"172.30.9.45\",\"Flags\": 2,\"FragmentOffset\": 0,\"HeaderChecksum\": 39105,\"Id\": 14173,\"Ihl\": 20,\"Protocol\": 6,\"Source\": -1407317712,\"SourceAddress\": \"172.30.9.48\",\"Tos\": 0,\"TotalLength\": 64,\"Ttl\": 64,\"Version\": 4},\"tcpheader\": { \"Ack\": 898627232,\"Checksum\": 57115,\"DataLength\": 12,\"DataOffset\": 8,\"DestinationAddress\": \"172.30.9.45\",\"DestinationPort\": 45629,\"Direction\": null,\"Flags\": 24,\"ReassembledLength \": 0,\"RelativeAck\": 0,\"RelativeSeq\": 0,\"Seq\": 1030775351,\"SessionKey\": \"172.30.9.48:9092 -> 172.30.9.45:45629\",\"SourceAddress\": \"172.30.9.48\",\"SourcePort\": 9092,\"TotalLength\": 44,\"UrgentPointer\": 0,\"Window\": 1453}}" };
+
+  /** The Constant protoCols. */
+  private static final String[] protoCols = { "TCP", "UDP", "SNMP" };
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see backtype.storm.spout.ISpout#open(java.util.Map,
+   * backtype.storm.task.TopologyContext,
+   * backtype.storm.spout.SpoutOutputCollector)
+   */
+  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+
+    System.out.println("Opening PcapSimulatorSpout");
+
+    this.collector = collector;
+
+    if (conf.containsKey("storm.topology.pcap.spout.pcap-kafka-simulator-spout.packet.size.in.bytes")) {
+
+      messageSize = Integer.valueOf(conf.get("storm.topology.pcap.spout.pcap-kafka-simulator-spout.packet.size.in.bytes").toString());
+      pcap = new byte[messageSize];
+
+      System.out.println("Using message size : " + messageSize);
+    }
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see backtype.storm.spout.ISpout#nextTuple()
+   */
+  public void nextTuple() {
+
+    // System.out.println("nextTuple of PcapSimulatorSpout");
+    ipAddr.setLength(0);
+    String srcAddr = ipAddr.append(randomIpSegmentGenerator.nextInt(255)).append('.').append(randomIpSegmentGenerator.nextInt(255))
+        .append('.').append(randomIpSegmentGenerator.nextInt(255)).append('.').append(randomIpSegmentGenerator.nextInt(255)).toString();
+    ipAddr.setLength(0);
+    String dstAddr = ipAddr.append(randomIpSegmentGenerator.nextInt(255)).append('.').append(randomIpSegmentGenerator.nextInt(255))
+        .append('.').append(randomIpSegmentGenerator.nextInt(255)).append('.').append(randomIpSegmentGenerator.nextInt(255)).toString();
+
+    String key = PcapUtils.getSessionKey(srcAddr, dstAddr, String.valueOf(randomProtocolGenerator.nextInt(255)),
+        String.valueOf(randomPortGenerator.nextInt(64000)), String.valueOf(randomPortGenerator.nextInt(64000)), "0", "0");
+
+    jsonDoc = jsonDocs[randomJsonGenerator.nextInt(8)];
+    ts = System.currentTimeMillis() + randomPortGenerator.nextInt();
+    randomPcapGenerator.nextBytes(pcap);
+
+    collector.emit(new Values(srcAddr, key.toString(), jsonDoc, ts, pcap));
+
+    collector.emit("pcap_index_stream", new Values(jsonDoc, key));
+    collector.emit("pcap_header_stream", new Values(jsonDoc, key));
+    collector.emit("pcap_data_stream", new Values(key.toString(), ts, pcap));
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology
+   * .OutputFieldsDeclarer)
+   */
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    System.out.println("Declaring output fields of PcapSimulatorSpout");
+
+    declarer.declareStream("pcap_index_stream", new Fields("index_json"));
+    declarer.declareStream("pcap_header_stream", new Fields("header_json"));
+    declarer.declareStream("pcap_data_stream", new Fields("pcap_id", "timestamp", "pcap"));
+
+  }
+  
+  @Override
+  public void ack(Object id) {
+  }
+
+  @Override
+  public void fail(Object id) {
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Asa.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Asa.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Asa.java
new file mode 100644
index 0000000..68f0c89
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Asa.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+import com.opensoc.topology.runner.AsaRunner;
+import com.opensoc.topology.runner.TopologyRunner;
+
+
+/**
+ * Topology for processing Asa messages
+ *
+ */
+public class Asa{
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new AsaRunner();
+		runner.initTopology(args, "asa");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Bro.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Bro.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Bro.java
new file mode 100644
index 0000000..280738c
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Bro.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+import com.opensoc.topology.runner.BroRunner;
+import com.opensoc.topology.runner.TopologyRunner;
+
+public class Bro{
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new BroRunner();
+		runner.initTopology(args, "bro");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/FireEye.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/FireEye.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/FireEye.java
new file mode 100644
index 0000000..e1f489b
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/FireEye.java
@@ -0,0 +1,21 @@
+package com.opensoc.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+import backtype.storm.generated.InvalidTopologyException;
+import com.opensoc.topology.runner.FireEyeRunner;
+import com.opensoc.topology.runner.TopologyRunner;
+
+
+/**
+ * Topology for processing FireEye syslog messages
+ *
+ */
+public class FireEye {
+
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new FireEyeRunner();
+		runner.initTopology(args, "fireeye");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Ise.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Ise.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Ise.java
new file mode 100644
index 0000000..7bcd0c2
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Ise.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+import com.opensoc.topology.runner.ISERunner;
+import com.opensoc.topology.runner.TopologyRunner;
+
+/**
+ * Topology for processing Ise messages
+ *
+ */
+public class Ise{
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new ISERunner();
+		runner.initTopology(args, "ise");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Lancope.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Lancope.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Lancope.java
new file mode 100644
index 0000000..c3ecc54
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Lancope.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+import com.opensoc.topology.runner.LancopeRunner;
+import com.opensoc.topology.runner.TopologyRunner;
+
+
+/**
+ * Topology for processing Lancope messages
+ *
+ */
+public class Lancope{
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new LancopeRunner();
+		runner.initTopology(args, "lancope");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/PaloAltoFirewall.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/PaloAltoFirewall.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/PaloAltoFirewall.java
new file mode 100644
index 0000000..222cc29
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/PaloAltoFirewall.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+import com.opensoc.topology.runner.AsaRunner;
+import com.opensoc.topology.runner.PaloAltoFirewallRunner;
+import com.opensoc.topology.runner.TopologyRunner;
+
+
+/**
+ * Topology for processing Palo Alto Firewall Syslog messages
+ *
+ */
+public class PaloAltoFirewall {
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new PaloAltoFirewallRunner();
+		runner.initTopology(args, "paloalto");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Pcap.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Pcap.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Pcap.java
new file mode 100644
index 0000000..2532893
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Pcap.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+import com.opensoc.topology.runner.PcapRunner;
+import com.opensoc.topology.runner.TopologyRunner;
+
+
+/**
+ * Topology for processing raw packet messages
+ *
+ */
+
+public class Pcap{
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new PcapRunner();
+		runner.initTopology(args, "pcap");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Sourcefire.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Sourcefire.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Sourcefire.java
new file mode 100644
index 0000000..bb8a43f
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/Sourcefire.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import com.opensoc.topology.runner.SourcefireRunner;
+import com.opensoc.topology.runner.TopologyRunner;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+
+/**
+ * Topology for processing Sourcefire messages
+ *
+ */
+public class Sourcefire{
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new SourcefireRunner();
+		runner.initTopology(args, "sourcefire");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/AsaRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/AsaRunner.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/AsaRunner.java
new file mode 100644
index 0000000..8cc2db7
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/AsaRunner.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology.runner;
+
+import com.opensoc.filters.GenericMessageFilter;
+import com.opensoc.parser.interfaces.MessageParser;
+import com.opensoc.parsing.AbstractParserBolt;
+import com.opensoc.parsing.TelemetryParserBolt;
+import com.opensoc.test.spouts.GenericInternalTestSpout;
+
+public class AsaRunner extends TopologyRunner{
+	
+	 static String test_file_path = "SampleInput/AsaOutput";
+
+	@Override
+	public boolean initializeParsingBolt(String topology_name,
+			String name) {
+		try {
+			
+			String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1);
+			
+			System.out.println("[OpenSOC] ------" +  name + " is initializing from " + messageUpstreamComponent);
+
+			
+			String class_name = config.getString("bolt.parser.adapter");
+			
+			if(class_name == null)
+			{
+				System.out.println("[OpenSOC] Parser adapter not set.  Please set bolt.indexing.adapter in topology.conf");
+				throw new Exception("Parser adapter not set");
+			}
+			
+			Class loaded_class = Class.forName(class_name);
+			MessageParser parser = (MessageParser) loaded_class.newInstance();
+			
+	        
+			AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+					.withMessageParser(parser)
+					.withOutputFieldName(topology_name)
+					.withMessageFilter(new GenericMessageFilter())
+					.withMetricConfig(config);
+
+			builder.setBolt(name, parser_bolt,
+					config.getInt("bolt.parser.parallelism.hint"))
+					.shuffleGrouping(messageUpstreamComponent)
+					.setNumTasks(config.getInt("bolt.parser.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+
+		return true;
+	}
+
+	@Override	
+	public  boolean initializeTestingSpout(String name) {
+		try {
+
+			System.out.println("[OpenSOC] Initializing Test Spout");
+
+			GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+					.withFilename(test_file_path).withRepeating(
+							config.getBoolean("spout.test.parallelism.repeat"));
+
+			builder.setSpout(name, testSpout,
+					config.getInt("spout.test.parallelism.hint")).setNumTasks(
+					config.getInt("spout.test.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+		return true;
+	}
+	
+	
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/BroRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/BroRunner.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/BroRunner.java
new file mode 100644
index 0000000..c448017
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/BroRunner.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology.runner;
+
+import com.opensoc.filters.GenericMessageFilter;
+import com.opensoc.parser.interfaces.MessageParser;
+import com.opensoc.parsing.AbstractParserBolt;
+import com.opensoc.parsing.TelemetryParserBolt;
+import com.opensoc.test.spouts.GenericInternalTestSpout;
+
+public class BroRunner extends TopologyRunner{
+	
+	 static String test_file_path = "SampleInput/BroExampleOutput";
+
+	@Override
+	public boolean initializeParsingBolt(String topology_name,
+			String name) {
+		try {
+
+			String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1);
+			
+			System.out.println("[OpenSOC] ------" +  name + " is initializing from " + messageUpstreamComponent);
+			
+			String class_name = config.getString("bolt.parser.adapter");
+			
+			if(class_name == null)
+			{
+				System.out.println("[OpenSOC] Parser adapter not set.  Please set bolt.indexing.adapter in topology.conf");
+				throw new Exception("Parser adapter not set");
+			}
+			
+			Class loaded_class = Class.forName(class_name);
+			MessageParser parser = (MessageParser) loaded_class.newInstance();
+			
+			AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+					.withMessageParser(parser)
+					.withOutputFieldName(topology_name)
+					.withMessageFilter(new GenericMessageFilter())
+					.withMetricConfig(config);
+
+			builder.setBolt(name, parser_bolt,
+					config.getInt("bolt.parser.parallelism.hint"))
+					.shuffleGrouping(messageUpstreamComponent)
+					.setNumTasks(config.getInt("bolt.parser.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+
+		return true;
+	}
+
+	@Override	
+	public  boolean initializeTestingSpout(String name) {
+		try {
+
+			System.out.println("[OpenSOC] Initializing Test Spout");
+
+			GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+					.withFilename(test_file_path).withRepeating(
+							config.getBoolean("spout.test.parallelism.repeat"));
+
+			builder.setSpout(name, testSpout,
+					config.getInt("spout.test.parallelism.hint")).setNumTasks(
+					config.getInt("spout.test.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/FireEyeRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/FireEyeRunner.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/FireEyeRunner.java
new file mode 100644
index 0000000..31026df
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/FireEyeRunner.java
@@ -0,0 +1,77 @@
+package com.opensoc.topology.runner;
+
+import com.opensoc.filters.GenericMessageFilter;
+import com.opensoc.parser.interfaces.MessageParser;
+import com.opensoc.parsing.AbstractParserBolt;
+import com.opensoc.parsing.TelemetryParserBolt;
+import com.opensoc.test.spouts.GenericInternalTestSpout;
+
+public class FireEyeRunner extends TopologyRunner{
+	
+	 static String test_file_path = "SampleInput/FireeyeExampleOutput";
+
+	@Override
+	public boolean initializeParsingBolt(String topology_name,
+			String name) {
+		try {
+			
+			String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1);
+			
+			System.out.println("[OpenSOC] ------" +  name + " is initializing from " + messageUpstreamComponent);
+
+			
+			String class_name = config.getString("bolt.parser.adapter");
+			
+			if(class_name == null)
+			{
+				System.out.println("[OpenSOC] Parser adapter not set.  Please set bolt.indexing.adapter in topology.conf");
+				throw new Exception("Parser adapter not set");
+			}
+			
+			Class loaded_class = Class.forName(class_name);
+			MessageParser parser = (MessageParser) loaded_class.newInstance();
+	        
+	        
+			AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+					.withMessageParser(parser)
+					.withOutputFieldName(topology_name)
+					.withMessageFilter(new GenericMessageFilter())
+					.withMetricConfig(config);
+
+			builder.setBolt(name, parser_bolt,
+					config.getInt("bolt.parser.parallelism.hint"))
+					.shuffleGrouping(messageUpstreamComponent)
+					.setNumTasks(config.getInt("bolt.parser.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+
+		return true;
+	}
+
+	@Override	
+	public  boolean initializeTestingSpout(String name) {
+		try {
+
+			System.out.println("[OpenSOC] Initializing Test Spout");
+
+			GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+					.withFilename(test_file_path).withRepeating(
+							config.getBoolean("spout.test.parallelism.repeat"));
+
+			builder.setSpout(name, testSpout,
+					config.getInt("spout.test.parallelism.hint")).setNumTasks(
+					config.getInt("spout.test.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+		return true;
+	}
+	
+	
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/ISERunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/ISERunner.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/ISERunner.java
new file mode 100644
index 0000000..7f377d5
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/ISERunner.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology.runner;
+
+import com.opensoc.filters.GenericMessageFilter;
+import com.opensoc.parser.interfaces.MessageParser;
+import com.opensoc.parsing.AbstractParserBolt;
+import com.opensoc.parsing.TelemetryParserBolt;
+import com.opensoc.test.spouts.GenericInternalTestSpout;
+
+public class ISERunner extends TopologyRunner{
+	
+	 static String test_file_path = "SampleInput/ISESampleOutput";
+
+	@Override
+	public boolean initializeParsingBolt(String topology_name,
+			String name) {
+		try {
+			
+			String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1);
+			
+			System.out.println("[OpenSOC] ------" +  name + " is initializing from " + messageUpstreamComponent);
+
+			
+			String class_name = config.getString("bolt.parser.adapter");
+			
+			if(class_name == null)
+			{
+				System.out.println("[OpenSOC] Parser adapter not set.  Please set bolt.indexing.adapter in topology.conf");
+				throw new Exception("Parser adapter not set");
+			}
+			
+			Class loaded_class = Class.forName(class_name);
+			MessageParser parser = (MessageParser) loaded_class.newInstance();
+			
+			
+			AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+					.withMessageParser(parser)
+					.withOutputFieldName(topology_name)
+					.withMessageFilter(new GenericMessageFilter())
+					.withMetricConfig(config);
+
+			builder.setBolt(name, parser_bolt,
+					config.getInt("bolt.parser.parallelism.hint"))
+					.shuffleGrouping(messageUpstreamComponent)
+					.setNumTasks(config.getInt("bolt.parser.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+
+		return true;
+	}
+
+	@Override	
+	public  boolean initializeTestingSpout(String name) {
+		try {
+
+			System.out.println("[OpenSOC] Initializing Test Spout");
+
+			GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+					.withFilename(test_file_path).withRepeating(
+							config.getBoolean("spout.test.parallelism.repeat"));
+
+			builder.setSpout(name, testSpout,
+					config.getInt("spout.test.parallelism.hint")).setNumTasks(
+					config.getInt("spout.test.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/LancopeRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/LancopeRunner.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/LancopeRunner.java
new file mode 100644
index 0000000..1031abf
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/LancopeRunner.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology.runner;
+
+import com.opensoc.filters.GenericMessageFilter;
+import com.opensoc.parser.interfaces.MessageParser;
+import com.opensoc.parsing.AbstractParserBolt;
+import com.opensoc.parsing.TelemetryParserBolt;
+import com.opensoc.parsing.parsers.BasicLancopeParser;
+import com.opensoc.test.spouts.GenericInternalTestSpout;
+
+public class LancopeRunner extends TopologyRunner{
+	
+	 static String test_file_path = "SampleInput/LancopeExampleOutput";
+
+	@Override
+	public boolean initializeParsingBolt(String topology_name,
+			String name) {
+		try {
+			
+			String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1);
+			
+			System.out.println("[OpenSOC] ------" +  name + " is initializing from " + messageUpstreamComponent);
+
+			
+			String class_name = config.getString("bolt.parser.adapter");
+			
+			if(class_name == null)
+			{
+				System.out.println("[OpenSOC] Parser adapter not set.  Please set bolt.indexing.adapter in topology.conf");
+				throw new Exception("Parser adapter not set");
+			}
+			
+			Class loaded_class = Class.forName(class_name);
+			MessageParser parser = (MessageParser) loaded_class.newInstance();
+			
+			AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+					.withMessageParser(parser)
+					.withOutputFieldName(topology_name)
+					.withMessageFilter(new GenericMessageFilter())
+					.withMetricConfig(config);
+
+			builder.setBolt(name, parser_bolt,
+					config.getInt("bolt.parser.parallelism.hint"))
+					.shuffleGrouping(messageUpstreamComponent)
+					.setNumTasks(config.getInt("bolt.parser.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+
+		return true;
+	}
+
+	@Override	
+	public  boolean initializeTestingSpout(String name) {
+		try {
+
+			System.out.println("[OpenSOC] Initializing Test Spout");
+
+			GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+					.withFilename(test_file_path).withRepeating(
+							config.getBoolean("spout.test.parallelism.repeat"));
+
+			builder.setSpout(name, testSpout,
+					config.getInt("spout.test.parallelism.hint")).setNumTasks(
+					config.getInt("spout.test.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+		return true;
+	}
+	
+	
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/PaloAltoFirewallRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/PaloAltoFirewallRunner.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/PaloAltoFirewallRunner.java
new file mode 100644
index 0000000..0b6adad
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/PaloAltoFirewallRunner.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology.runner;
+
+import com.opensoc.filters.GenericMessageFilter;
+import com.opensoc.parser.interfaces.MessageParser;
+import com.opensoc.parsing.AbstractParserBolt;
+import com.opensoc.parsing.TelemetryParserBolt;
+import com.opensoc.test.spouts.GenericInternalTestSpout;
+
+public class PaloAltoFirewallRunner extends TopologyRunner{
+	
+	 static String test_file_path = "SampleInput/PaloaltoOutput";
+
+	@Override
+	public boolean initializeParsingBolt(String topology_name,
+			String name) {
+		try {
+			
+			String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1);
+			
+			System.out.println("[OpenSOC] ------" +  name + " is initializing from " + messageUpstreamComponent);
+
+			
+			String class_name = config.getString("bolt.parser.adapter");
+			
+			if(class_name == null)
+			{
+				System.out.println("[OpenSOC] Parser adapter not set.  Please set bolt.indexing.adapter in topology.conf");
+				throw new Exception("Parser adapter not set");
+			}
+			
+			Class loaded_class = Class.forName(class_name);
+			MessageParser parser = (MessageParser) loaded_class.newInstance();
+	        
+			
+	        
+			AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+					.withMessageParser(parser)
+					.withOutputFieldName(topology_name)
+					.withMessageFilter(new GenericMessageFilter())
+					.withMetricConfig(config);
+
+			builder.setBolt(name, parser_bolt,
+					config.getInt("bolt.parser.parallelism.hint"))
+					.shuffleGrouping(messageUpstreamComponent)
+					.setNumTasks(config.getInt("bolt.parser.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+
+		return true;
+	}
+
+	@Override	
+	public  boolean initializeTestingSpout(String name) {
+		try {
+
+			System.out.println("[OpenSOC] Initializing Test Spout");
+
+			GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+					.withFilename(test_file_path).withRepeating(
+							config.getBoolean("spout.test.parallelism.repeat"));
+
+			builder.setSpout(name, testSpout,
+					config.getInt("spout.test.parallelism.hint")).setNumTasks(
+					config.getInt("spout.test.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+		return true;
+	}
+	
+	
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/PcapRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/PcapRunner.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/PcapRunner.java
new file mode 100644
index 0000000..a26a467
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/PcapRunner.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology.runner;
+
+import com.opensoc.parsing.PcapParserBolt;
+import com.opensoc.test.spouts.GenericInternalTestSpout;
+
+public class PcapRunner extends TopologyRunner{
+	
+	 static String test_file_path = "SampleInput/PCAPExampleOutput";
+
+
+	@Override	
+	public  boolean initializeTestingSpout(String name) {
+		try {
+
+			
+			System.out.println("[OpenSOC] Initializing Test Spout");
+			
+
+			GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+					.withFilename(test_file_path).withRepeating(
+							config.getBoolean("spout.test.parallelism.repeat"));
+
+			builder.setSpout(name, testSpout,
+					config.getInt("spout.test.parallelism.hint")).setNumTasks(
+					config.getInt("spout.test.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+		return true;
+	}
+
+	@Override
+	boolean initializeParsingBolt(String topology_name, String name) {
+		try {
+
+			String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1);
+			
+			System.out.println("[OpenSOC] ------" +  name + " is initializing from " + messageUpstreamComponent);
+			
+			PcapParserBolt pcapParser = new PcapParserBolt().withTsPrecision(config.getString("bolt.parser.ts.precision"));
+			
+			builder.setBolt(name, pcapParser,
+					config.getInt("bolt.parser.parallelism.hint"))
+					.setNumTasks(config.getInt("bolt.parser.num.tasks"))
+					.shuffleGrouping(messageUpstreamComponent);
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/SourcefireRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/SourcefireRunner.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/SourcefireRunner.java
new file mode 100644
index 0000000..69b4581
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/SourcefireRunner.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.topology.runner;
+
+import com.opensoc.filters.GenericMessageFilter;
+import com.opensoc.parser.interfaces.MessageParser;
+import com.opensoc.parsing.AbstractParserBolt;
+import com.opensoc.parsing.TelemetryParserBolt;
+import com.opensoc.test.spouts.GenericInternalTestSpout;
+
+public class SourcefireRunner extends TopologyRunner{
+	
+	 static String test_file_path = "SampleInput/SourcefireExampleOutput";
+
+	@Override
+	public boolean initializeParsingBolt(String topology_name,
+			String name) {
+		try {
+			
+			String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1);
+			
+			System.out.println("[OpenSOC] ------" +  name + " is initializing from " + messageUpstreamComponent);
+
+			
+			String class_name = config.getString("bolt.parser.adapter");
+			
+			if(class_name == null)
+			{
+				System.out.println("[OpenSOC] Parser adapter not set.  Please set bolt.indexing.adapter in topology.conf");
+				throw new Exception("Parser adapter not set");
+			}
+			
+			Class loaded_class = Class.forName(class_name);
+			MessageParser parser = (MessageParser) loaded_class.newInstance();
+	        
+	        
+			AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+					.withMessageParser(parser)
+					.withOutputFieldName(topology_name)
+					.withMessageFilter(new GenericMessageFilter())
+					.withMetricConfig(config);
+
+			builder.setBolt(name, parser_bolt,
+					config.getInt("bolt.parser.parallelism.hint"))
+					.shuffleGrouping(messageUpstreamComponent)
+					.setNumTasks(config.getInt("bolt.parser.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+
+		return true;
+	}
+
+	@Override	
+	public  boolean initializeTestingSpout(String name) {
+		try {
+
+			System.out.println("[OpenSOC] Initializing Test Spout");
+
+			GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+					.withFilename(test_file_path).withRepeating(
+							config.getBoolean("spout.test.parallelism.repeat"));
+
+			builder.setSpout(name, testSpout,
+					config.getInt("spout.test.parallelism.hint")).setNumTasks(
+					config.getInt("spout.test.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+		return true;
+	}
+	
+	
+
+}