You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by om...@apache.org on 2015/12/08 07:38:11 UTC

[47/51] [partial] incubator-metron git commit: Initial import of code from https://github.com/OpenSOC/opensoc at ac0b00373f8f56dfae03a8109af5feb373ea598e.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataLoads/pom.xml.versionsBackup
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataLoads/pom.xml.versionsBackup b/opensoc-streaming/OpenSOC-DataLoads/pom.xml.versionsBackup
new file mode 100644
index 0000000..a122057
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataLoads/pom.xml.versionsBackup
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?><!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+ 
+     http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+   <parent>
+    <groupId>com.opensoc</groupId>
+    <artifactId>OpenSOC-Streaming</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+  </parent>
+  <artifactId>OpenSOC-DataLoads</artifactId>
+  	<properties>
+		<opensoc.common.version>0.0.1-SNAPSHOT</opensoc.common.version>
+		<storm.version>0.9.1-incubating</storm.version>
+			<hbase.version>0.98.0-hadoop2</hbase.version>
+	</properties>
+  <dependencies>
+  		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Common</artifactId>
+			<version>${opensoc.common.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${storm.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-client</artifactId>
+			<version>${hbase.version}</version>
+		</dependency>
+  </dependencies>
+  <build>
+    <sourceDirectory>src</sourceDirectory>
+    <resources>
+      <resource>
+        <directory>src</directory>
+        <excludes>
+          <exclude>**/*.java</exclude>
+        </excludes>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.1</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.3</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+              <configuration>
+              <artifactSet>
+                <excludes>
+                  <exclude>classworlds:classworlds</exclude>
+                  <exclude>junit:junit</exclude>
+                  <exclude>jmock:*</exclude>
+                  <exclude>*:xml-apis</exclude>
+                  <exclude>org.apache.maven:lib:tests</exclude>
+                  <exclude>log4j:log4j:jar:</exclude>
+                <exclude>*:hbase:*</exclude>
+                </excludes>
+              </artifactSet>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataLoads/src/com/opensoc/dataloads/cif/HBaseTableLoad.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataLoads/src/com/opensoc/dataloads/cif/HBaseTableLoad.java b/opensoc-streaming/OpenSOC-DataLoads/src/com/opensoc/dataloads/cif/HBaseTableLoad.java
new file mode 100644
index 0000000..cdf0541
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataLoads/src/com/opensoc/dataloads/cif/HBaseTableLoad.java
@@ -0,0 +1,122 @@
+package com.opensoc.dataloads.cif;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.BufferedInputStream;
+
+public class HBaseTableLoad {
+
+	private static Configuration conf = null;
+	private final static String hbaseTable = "cif_table";
+	/**
+	 * Initialization
+	 */
+	static {
+		conf = HBaseConfiguration.create();
+	}
+
+	public static void main(String[] args) {
+
+		LoadDirHBase(args[0]);
+
+	}
+
+	public static void LoadDirHBase(String dirName) {
+		System.out.println("Working on:" + dirName);
+		File folder = new File(dirName);
+		File[] listOfFiles = folder.listFiles();
+
+		for (int i = 0; i < listOfFiles.length; i++) {
+			File file = listOfFiles[i];
+
+			if (file.isFile() && file.getName().endsWith(".gz")) {
+
+				// e.g. folder name is infrastructure_botnet. Col Qualifier is
+				// botnet and col_family is infrastructure
+
+				String col_family = folder.getName().split("_")[0];
+				String col_qualifier = folder.getName().split("_")[1];
+
+				// Open gz file
+				try {
+					InputStream input = new BufferedInputStream(
+							new GZIPInputStream(new FileInputStream(file)));
+
+					HBaseBulkPut(input, col_family, col_qualifier);
+
+				} catch (IOException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				} catch (ParseException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+			} else if (file.isDirectory()) // if sub-directory then call the
+											// function recursively
+				LoadDirHBase(file.getAbsolutePath());
+		}
+	}
+
+	/**
+	 * @param input
+	 * @param hbaseTable
+	 * @param col_family
+	 * @throws IOException
+	 * @throws ParseException
+	 * 
+	 * 
+	 *             Inserts all json records picked up from the inputStream
+	 */
+	public static void HBaseBulkPut(InputStream input, String col_family,
+			String col_qualifier) throws IOException, ParseException {
+
+		HTable table = new HTable(conf, hbaseTable);
+		JSONParser parser = new JSONParser();
+
+		BufferedReader br = new BufferedReader(new InputStreamReader(input));
+		String jsonString;
+		List<Put> allputs = new ArrayList<Put>();
+		Map json;
+
+		while ((jsonString = br.readLine()) != null) {
+
+			try {
+
+				json = (Map) parser.parse(jsonString);
+			} catch (ParseException e) {
+				//System.out.println("Unable to Parse: " +jsonString);
+				continue;
+			}
+			// Iterator iter = json.entrySet().iterator();
+
+			// Get Address - either IP/domain or email and make that the Key
+			Put put = new Put(Bytes.toBytes((String) json.get("address")));
+
+			// We are just adding a "Y" flag to mark this address
+			put.add(Bytes.toBytes(col_family), Bytes.toBytes(col_qualifier),
+					Bytes.toBytes("Y"));
+
+			allputs.add(put);
+		}
+		table.put(allputs);
+		System.out.println("---------------Values------------------"
+				+ hbaseTable);
+		table.close();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataLoads/src/hbase-site.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataLoads/src/hbase-site.xml b/opensoc-streaming/OpenSOC-DataLoads/src/hbase-site.xml
new file mode 100644
index 0000000..a73469d
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataLoads/src/hbase-site.xml
@@ -0,0 +1,100 @@
+<!--Tue Feb 11 02:34:08 2014 -->
+<configuration>
+
+	<property>
+		<name>hbase.regionserver.global.memstore.lowerLimit</name>
+		<value>0.38</value>
+	</property>
+	<property>
+		<name>zookeeper.session.timeout</name>
+		<value>30000</value>
+	</property>
+
+	<property>
+		<name>hbase.security.authorization</name>
+		<value>false</value>
+	</property>
+	<property>
+		<name>hbase.cluster.distributed</name>
+		<value>true</value>
+	</property>
+	
+	<property>
+		<name>hbase.hstore.flush.retries.number</name>
+		<value>120</value>
+	</property>
+	<property>
+		<name>hbase.hregion.memstore.block.multiplier</name>
+		<value>4</value>
+	</property>
+	<property>
+		<name>hbase.hstore.blockingStoreFiles</name>
+		<value>200</value>
+	</property>
+	<property>
+		<name>hbase.defaults.for.version.skip</name>
+		<value>true</value>
+	</property>
+	<property>
+		<name>hbase.regionserver.global.memstore.upperLimit</name>
+		<value>0.4</value>
+	</property>
+	<property>
+		<name>hbase.hregion.memstore.mslab.enabled</name>
+		<value>true</value>
+	</property>
+	<property>
+		<name>hbase.client.keyvalue.maxsize</name>
+		<value>10485760</value>
+	</property>
+	<property>
+		<name>hbase.superuser</name>
+		<value>hbase</value>
+	</property>
+	<property>
+		<name>hfile.block.cache.size</name>
+		<value>0.40</value>
+	</property>
+	<property>
+		<name>zookeeper.znode.parent</name>
+		<value>/hbase-unsecure</value>
+	</property>
+	<property>
+		<name>hbase.hregion.max.filesize</name>
+		<value>10737418240</value>
+	</property>
+	<property>
+		<name>hbase.zookeeper.property.clientPort</name>
+		<value>2181</value>
+	</property>
+	<property>
+		<name>hbase.security.authentication</name>
+		<value>simple</value>
+	</property>
+	<property>
+		<name>hbase.client.scanner.caching</name>
+		<value>100</value>
+	</property>
+	<property>
+		<name>hbase.hregion.memstore.flush.size</name>
+		<value>134217728</value>
+	</property>
+	<property>
+		<name>hbase.hregion.majorcompaction</name>
+		<value>86400000</value>
+	</property>
+	 <property>
+      <name>hbase.zookeeper.property.clientPort</name>
+      <value>2181</value>
+    </property>
+
+    <property>
+      <name>hbase.zookeeper.quorum</name>
+      <value>zkpr1</value>
+    </property>
+
+	<property>
+		<name>hbase.client.write.buffer</name>
+		<value>500000000</value>
+	</property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/conf/config.properties
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/conf/config.properties b/opensoc-streaming/OpenSOC-DataServices/conf/config.properties
new file mode 100644
index 0000000..7c060fb
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/conf/config.properties
@@ -0,0 +1,21 @@
+# sample config file for the Alerts/Proxy service.  Fill in
+# the settings below with values appropriate to your environment. 
+
+alertsServiceImpl=elasticsearch
+ldapUrl=ldap://ec2-54-88-217-194.compute-1.amazonaws.com
+initialDelayTime=120
+searchIntervalTime=30
+elasticSearchHostName=localhost
+elasticSearchHostPort=9300
+kafkaBrokerHostName=ec2-example.compute-1.amazonaws.com
+kafkaBrokerHostPort=9092
+kafkaTopicName=test
+kafkaZookeeperHost=ec2-example.compute-1.amazonaws.com
+kafkaZookeeperPort=2181
+kafkaGroupId=abc123
+keystoreFile=./keystore
+keystorePassword=password
+authTokenAlias=authTokenKey
+authTokenMaxAge=10000
+alerts.cache.expiration.interval=360000
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/pom.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/pom.xml b/opensoc-streaming/OpenSOC-DataServices/pom.xml
new file mode 100644
index 0000000..56b7372
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/pom.xml
@@ -0,0 +1,278 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>com.opensoc</groupId>
+	<artifactId>OpenSOC-DataServices</artifactId>
+	<version>1.0-SNAPSHOT</version>
+
+	<properties>
+		<slf4j.version>1.6.4</slf4j.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.elasticsearch</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>1.3.1</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-codec</groupId>
+			<artifactId>commons-codec</artifactId>
+			<version>1.6</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-cli</groupId>
+			<artifactId>commons-cli</artifactId>
+			<version>1.2</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-server</artifactId>
+			<version>9.2.1.v20140609</version>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-servlet</artifactId>
+			<version>9.2.1.v20140609</version>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>apache-jsp</artifactId>
+			<version>9.2.1.v20140609</version>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-jsp</artifactId>
+			<version>9.2.1.v20140609</version>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-webapp</artifactId>
+			<version>9.2.1.v20140609</version>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-annotations</artifactId>
+			<version>9.2.1.v20140609</version>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty.websocket</groupId>
+			<artifactId>javax-websocket-server-impl</artifactId>
+			<version>9.2.1.v20140609</version>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty.websocket</groupId>
+			<artifactId>websocket-server</artifactId>
+			<version>9.2.1.v20140609</version>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty.websocket</groupId>
+			<artifactId>websocket-common</artifactId>
+			<version>9.2.1.v20140609</version>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty.websocket</groupId>
+			<artifactId>websocket-api</artifactId>
+			<version>9.2.1.v20140609</version>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty.websocket</groupId>
+			<artifactId>websocket-parent</artifactId>
+			<version>9.2.1.v20140609</version>
+			<type>pom</type>
+		</dependency>
+		<dependency>
+			<groupId>com.google.code.gson</groupId>
+			<artifactId>gson</artifactId>
+			<version>2.2.4</version>
+		</dependency>
+		<dependency>
+			<groupId>com.google.inject</groupId>
+			<artifactId>guice</artifactId>
+			<version>3.0</version>
+		</dependency>
+		<dependency>
+			<groupId>com.google.inject.extensions</groupId>
+			<artifactId>guice-servlet</artifactId>
+			<version>3.0</version>
+		</dependency>
+		<dependency>
+			<groupId>javax.annotation</groupId>
+			<artifactId>jsr250-api</artifactId>
+			<version>1.0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.resteasy</groupId>
+			<artifactId>resteasy-jaxrs</artifactId>
+			<version>3.0.7.Final</version>
+			<exclusions>
+				<exclusion>
+					<artifactId>httpclient</artifactId>
+					<groupId>org.apache.httpcomponents</groupId>
+				</exclusion>
+				<exclusion>
+					<artifactId>jboss-annotations-api_1.1_spec</artifactId>
+					<groupId>org.jboss.spec.javax.annotation</groupId>
+				</exclusion>
+				<exclusion>
+					<artifactId>jcip-annotations</artifactId>
+					<groupId>net.jcip</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.resteasy</groupId>
+			<artifactId>resteasy-guice</artifactId>
+			<version>3.0.7.Final</version>
+		</dependency>
+
+		<!-- Apache Shiro -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>1.6.4</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.shiro</groupId>
+			<artifactId>shiro-core</artifactId>
+			<version>1.2.3</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.shiro</groupId>
+			<artifactId>shiro-web</artifactId>
+			<version>1.2.3</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.shiro</groupId>
+			<artifactId>shiro-guice</artifactId>
+			<version>1.2.3</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>${slf4j.version}</version>
+		</dependency>
+		<dependency>
+			<!-- Required in the sample apps only for 3rd-party libraries that expect 
+				to call the commons logging APIs -->
+			<groupId>org.slf4j</groupId>
+			<artifactId>jcl-over-slf4j</artifactId>
+			<version>${slf4j.version}</version>
+		</dependency>
+
+		<!-- Kafka -->
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.9.2</artifactId>
+			<version>0.8.1.1</version>
+			<exclusions>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- TESTS -->
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.11</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.resteasy</groupId>
+			<artifactId>resteasy-client</artifactId>
+			<version>3.0.7.Final</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<artifactId>httpclient</artifactId>
+			<groupId>org.apache.httpcomponents</groupId>
+			<type>jar</type>
+			<version>4.3.3</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.mockito</groupId>
+			<artifactId>mockito-core</artifactId>
+			<version>1.9.5</version>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<inherited>true</inherited>
+				<configuration>
+					<source>1.7</source>
+					<target>1.7</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.4</version>
+				<configuration>
+					<archive>
+						<manifest>
+							<addClasspath>true</addClasspath>
+							<mainClass>com.opensoc.dataservices.Main</mainClass>
+						</manifest>
+					</archive>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.4</version>
+				<configuration>
+					<archive>
+						<manifest>
+							<mainClass>com.opensoc.dataservices.Main</mainClass>
+						</manifest>
+					</archive>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id> <!-- this is used for inheritance merges -->
+						<phase>package</phase> <!-- bind to the packaging phase -->
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>exec-maven-plugin</artifactId>
+				<version>1.2.1</version>
+				<configuration>
+					<mainClass>com.opensoc.dataservices.Main</mainClass>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsCacheReaper.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsCacheReaper.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsCacheReaper.java
new file mode 100644
index 0000000..d150833
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsCacheReaper.java
@@ -0,0 +1,45 @@
+package com.opensoc.alerts.server;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.inject.Inject;
+
+public class AlertsCacheReaper implements Runnable {
+	
+	private AlertsSearcher searcher;
+	
+	@Inject
+	private Properties configProps;
+	
+	public void setSearcher(AlertsSearcher searcher) {
+		this.searcher = searcher;
+	}
+	
+	@Override
+	public void run() {
+	
+		long expireAlertsCacheInterval = Long.parseLong( configProps.getProperty( "alerts.cache.expiration.interval", "360000" ) );
+		long timeNow = System.currentTimeMillis();
+		
+		long cutOffTime = timeNow - expireAlertsCacheInterval;
+		
+		List<String> forRemoval = new ArrayList<String>(); 
+		
+		for( Map.Entry<String, AlertsFilterCacheEntry> entry : searcher.alertsFilterCache.entrySet()  )
+		{
+			// if entry was saved more than X timeunits ago, remove it
+			if( entry.getValue().storedAtTime < cutOffTime )
+			{
+				forRemoval.add(entry.getKey());
+			}
+		}
+		
+		for( String key : forRemoval )
+		{
+			searcher.alertsFilterCache.remove(key);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsFilterCacheEntry.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsFilterCacheEntry.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsFilterCacheEntry.java
new file mode 100644
index 0000000..7a0f687
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsFilterCacheEntry.java
@@ -0,0 +1,17 @@
+package com.opensoc.alerts.server;
+
+public class AlertsFilterCacheEntry {
+	
+
+	public String sourceData;
+	public long storedAtTime;
+
+	
+	public AlertsFilterCacheEntry(String sourceData, long timeNow) {
+		this.sourceData = sourceData;
+		this.storedAtTime = timeNow;
+	}
+	
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsProcessingServer.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsProcessingServer.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsProcessingServer.java
new file mode 100644
index 0000000..3cf6246
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsProcessingServer.java
@@ -0,0 +1,44 @@
+package com.opensoc.alerts.server;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.opensoc.dataservices.Main;
+
+public class AlertsProcessingServer {
+	
+	private static final Logger logger = LoggerFactory.getLogger( AlertsProcessingServer.class );
+	
+	private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+	
+	@Inject
+	private AlertsSearcher searcher;
+	@Inject
+	private AlertsCacheReaper reaper;
+	@Inject
+	private Properties configProps;
+	
+	public void startProcessing() {
+		
+		logger.debug( "startProcessing() invoked" );
+		
+		int initialDelayTime = Integer.parseInt( configProps.getProperty( "searchInitialDelayTime", "30" ) );
+		int searchIntervalTime = Integer.parseInt( configProps.getProperty( "searchIntervalTime", "30" ) );
+		
+		reaper.setSearcher(searcher);
+		
+		final ScheduledFuture<?> alertsSearcherHandle =
+			       scheduler.scheduleAtFixedRate( searcher, initialDelayTime, searchIntervalTime, SECONDS );
+				
+		scheduler.scheduleAtFixedRate(reaper, 120, 380, SECONDS);
+		
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsSearcher.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsSearcher.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsSearcher.java
new file mode 100644
index 0000000..15db704
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsSearcher.java
@@ -0,0 +1,237 @@
+package com.opensoc.alerts.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+
+public class AlertsSearcher implements Runnable {
+
+	private static final Logger logger = LoggerFactory.getLogger( AlertsSearcher.class );
+	
+	@Inject
+	private Properties configProps;
+	
+	// TODO: inject Searcher module for either ElasticSearch or Solr...
+	// TODO: inject OpenSocServiceFactory here
+	
+	public final Map<String, AlertsFilterCacheEntry> alertsFilterCache = new HashMap<String, AlertsFilterCacheEntry>();
+	MessageDigest md;
+	
+	public AlertsSearcher() throws NoSuchAlgorithmException
+	{
+		md = MessageDigest.getInstance("SHA-256");
+
+	}
+	
+	@Override
+	public void run() {
+
+		try
+		{
+			logger.debug( "Doing Elastic Search search..." );
+			
+			long currentSearchTime = System.currentTimeMillis();
+			long lastSearchTime = 0L;
+			
+			// look for a marker that tells us the last time we ran...
+			String homeDir = configProps.getProperty("homeDir");
+			if( homeDir.endsWith( "/" )) {
+				homeDir = homeDir.substring(0, homeDir.length()-1);
+			}
+			
+			logger.info( "using homeDir = " + homeDir );
+			
+			File searcherStateFile = new File( homeDir + "/searcherState.properties" );
+			if( searcherStateFile.exists() )
+			{
+				logger.info( "found existing searcherState.properties file" );
+				FileInputStream fis = null;
+				try {
+					fis = new FileInputStream( searcherStateFile );
+					Properties searcherState = new Properties();
+					searcherState.load(fis);
+					lastSearchTime = Long.parseLong( searcherState.getProperty("lastSearchTime"));
+				}
+				catch( FileNotFoundException e ) {
+					logger.error( "Error locating lastSearchTime value from state file", e );
+					
+				} catch (IOException e) {
+					logger.error( "Error locating lastSearchTime value from state file", e );
+				}
+				finally
+				{
+					try {
+						fis.close();
+					} catch (IOException e) {
+						logger.error( "Probably ignorable error closing file stream: ", e );
+					}
+				}
+			}
+			else
+			{
+				// nothing to do here.  We'll write out our lastSearchTime at the end
+				logger.info( "No existing searcherState.properties found" );
+			}
+			
+			// search for alerts newer than "lastSearchTime" 
+			Settings settings = ImmutableSettings.settingsBuilder()
+					.put("client.transport.sniff", true).build();
+			        // .put("cluster.name", "elasticsearch").build();
+	
+			Client client = null;
+			try
+			{
+				logger.info( "initializing elasticsearch client" );
+				
+				String elasticSearchHostName = configProps.getProperty( "elasticSearchHostName", "localhost" );
+				int elasticSearchHostPort = Integer.parseInt(configProps.getProperty( "elasticSearchHostPort", "9300" ) );
+				client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(elasticSearchHostName, elasticSearchHostPort));
+					
+				logger.info( "lastSearchTime: " + lastSearchTime );
+				
+				String alertsIndexes = configProps.getProperty( "alertsIndexes", "alerts" );
+				String[] alertsIndexArray = alertsIndexes.split(",");
+				
+				String alertsTypes = configProps.getProperty( "alertsTypes", "" );
+				String[] alertsTypesArray = alertsTypes.split( ",");
+				
+				String alertsQueryFieldName = configProps.getProperty( "alertQueryFieldName", "alert.source" );
+				String alertsQueryFieldValue = configProps.getProperty( "alertsQueryFieldValue", "*" );
+				
+				logger.info( "alertsIndexes: " + alertsIndexes );
+				
+				String[] foo = new String[1];
+				
+				SearchResponse response = client.prepareSearch( alertsIndexArray )
+				.setTypes( alertsTypesArray )
+				.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
+				.addField("_source")		
+				.setQuery( QueryBuilders.boolQuery().must(  QueryBuilders.wildcardQuery( alertsQueryFieldName, alertsQueryFieldValue ) )
+													.must( QueryBuilders.rangeQuery("message.timestamp").from(lastSearchTime).to(System.currentTimeMillis()).includeLower(true).includeUpper(false)))
+				.execute()
+				.actionGet();
+				
+				SearchHits hits = response.getHits();
+				logger.debug( "Total hits: " + hits.getTotalHits());
+
+				
+				// for all hits, put the alert onto the Kafka topic.
+				for( SearchHit hit : hits )
+				{
+					// calculate hash for this hit...
+					String sourceData = hit.getSourceAsString();
+					String hash = new String( md.digest(sourceData.getBytes()));
+					
+					if( alertsFilterCache.containsKey(hash))
+					{
+						logger.warn( "We have already seen this Alert, so skipping..." );
+						continue;
+					}
+					else
+					{
+						long timeNow = System.currentTimeMillis();
+						AlertsFilterCacheEntry cacheEntry = new AlertsFilterCacheEntry( sourceData, timeNow );
+						alertsFilterCache.put(hash, cacheEntry);
+					}
+					
+					doSenderWork(hit);
+				}
+			
+			}
+			finally
+			{			
+				if( client != null )
+				{
+					client.close();
+				}
+			}		
+			
+			// record the time we just searched
+			FileOutputStream fos = null;
+			try {
+				fos = new FileOutputStream( searcherStateFile );
+				Properties searcherState = new Properties();
+				searcherState.setProperty( "lastSearchTime", Long.toString(currentSearchTime));
+				searcherState.store(fos, "");
+				
+			}
+			catch(  FileNotFoundException e ) {
+				logger.error( "Error saving lastSearchTime: ", e );
+			} catch (IOException e) {
+				logger.error( "Error saving lastSearchTime: ", e );
+			}
+			finally {
+				
+				try {
+					fos.close();
+				} 
+				catch (IOException e) {
+					logger.error( "Probably ignorable error closing file stream: ", e );
+				}
+			}
+			
+			logger.info( "Done with ElasticSearch search... " );
+		}
+		catch( Exception e )
+		{
+			logger.error( "Unexpected error while searching ElasticSearch index:", e );
+		}
+	}
+	
+	private void doSenderWork( SearchHit hit )
+	{	
+		String kafkaBrokerHostName = configProps.getProperty("kafkaBrokerHostName", "localhost" );
+		String kafkaBrokerHostPort = configProps.getProperty("kafkaBrokerHostPort", "9092" );
+		String kafkaTopicName = configProps.getProperty("kafkaTopicName", "test" );
+		
+		logger.debug( "kafkaBrokerHostName: " + kafkaBrokerHostName );
+		logger.debug( "kafkaBrokerHostPort: " + kafkaBrokerHostPort );
+		logger.debug( "kafkaTopicName: " + kafkaTopicName );
+		
+		String sourceData = hit.getSourceAsString();
+		
+		logger.debug( "Source Data: " + sourceData );
+		Properties props = new Properties();
+		 
+		props.put("metadata.broker.list", kafkaBrokerHostName + ":" + kafkaBrokerHostPort );
+		props.put("serializer.class", "kafka.serializer.StringEncoder");
+		// props.put("partitioner.class", "example.producer.SimplePartitioner");
+		props.put("request.required.acks", "1");
+		 
+		ProducerConfig config = new ProducerConfig(props);
+		
+		Producer<String, String> producer = new Producer<String, String>(config);
+		
+		KeyedMessage<String, String> data = new KeyedMessage<String, String>(kafkaTopicName, "", sourceData );
+		 
+		producer.send(data);		
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/Main.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/Main.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/Main.java
new file mode 100644
index 0000000..f34dc50
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/Main.java
@@ -0,0 +1,288 @@
+package com.opensoc.dataservices;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Properties;
+
+import javax.servlet.DispatcherType;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.jasper.servlet.JspServlet;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.shiro.web.env.EnvironmentLoaderListener;
+import org.apache.tomcat.InstanceManager;
+import org.apache.tomcat.SimpleInstanceManager;
+import org.eclipse.jetty.annotations.ServletContainerInitializersStarter;
+import org.eclipse.jetty.apache.jsp.JettyJasperInitializer;
+import org.eclipse.jetty.plus.annotation.ContainerInitializer;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.webapp.WebAppContext;
+import org.jboss.resteasy.plugins.guice.GuiceResteasyBootstrapServletContextListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.servlet.GuiceFilter;
+import com.opensoc.alerts.server.AlertsProcessingServer;
+import com.opensoc.dataservices.modules.guice.AlertsServerModule;
+import com.opensoc.dataservices.modules.guice.DefaultServletModule;
+import com.opensoc.dataservices.modules.guice.DefaultShiroWebModule;
+
+public class Main {
+	
+	static int port = 9091;
+	
+	private static final String WEBROOT_INDEX = "/webroot/";
+	
+	private static final Logger logger = LoggerFactory.getLogger( Main.class );
+	
+    public static void main(String[] args) throws Exception {
+
+
+    	Options options = new Options();
+    	
+    	options.addOption( "homeDir", true, "Home directory for the service" );
+    	
+    	CommandLineParser parser = new GnuParser();
+    	CommandLine cmd = parser.parse( options, args);
+    	
+    	Properties configProps = new Properties();
+    	
+    	String homeDir = cmd.getOptionValue("homeDir");
+    	
+    	if( homeDir.endsWith( "/" ))
+    	{
+    		homeDir = homeDir.substring(0, homeDir.length()-1);
+    	}
+
+
+    	DOMConfigurator.configure( homeDir + "/log4j.xml" );
+    	
+    	logger.warn( "DataServices Server starting..." );
+    	
+    	
+    	File configFile = new File( homeDir + "/config.properties" );
+    	FileReader configFileReader = new FileReader( configFile );
+    	try
+    	{
+    		configProps.load(configFileReader);
+    		
+    		Option[] cmdOptions = cmd.getOptions();
+    		for( Option opt : cmdOptions )
+    		{
+    			String argName = opt.getOpt();
+    			String argValue = opt.getValue();
+	
+    			configProps.put(argName, argValue);
+    		}
+    		
+    	}
+    	finally
+    	{
+    		if( configFileReader != null )
+    		{
+    			configFileReader.close();
+    		}
+    	}
+    	
+        WebAppContext context = new WebAppContext();
+    	
+    	Injector injector = Guice.createInjector(   new DefaultServletModule(configProps), 
+    												new AlertsServerModule(configProps),
+    												new DefaultShiroWebModule(configProps, context.getServletContext()), 
+    												new AbstractModule() {
+			
+														@Override
+														protected void configure() {
+															binder().requireExplicitBindings();
+															bind(GuiceFilter.class);
+															bind( GuiceResteasyBootstrapServletContextListener.class );
+															bind( EnvironmentLoaderListener.class );
+										
+														}
+													}
+    											);
+
+    	
+        injector.getAllBindings();
+        injector.createChildInjector().getAllBindings();
+
+        Server server = new Server(port);
+        
+        /***************************************************
+         *************** enable SSL ************************
+         ***************************************************/
+        
+        // HTTP Configuration
+        HttpConfiguration http_config = new HttpConfiguration();
+        http_config.setSecureScheme("https");
+        http_config.setSecurePort(8443);
+        http_config.setOutputBufferSize(32768);
+        http_config.setRequestHeaderSize(8192);
+        http_config.setResponseHeaderSize(8192);
+        http_config.setSendServerVersion(true);
+        http_config.setSendDateHeader(false);
+        // httpConfig.addCustomizer(new ForwardedRequestCustomizer())
+        // SSL Context Factory
+        SslContextFactory sslContextFactory = new SslContextFactory();
+        
+        String sslKeystorePath = configProps.getProperty( "sslKeystorePath", "/keystore" );
+        logger.debug( "sslKeystorePath: " + sslKeystorePath );
+        sslContextFactory.setKeyStorePath( homeDir + sslKeystorePath );
+        
+        String sslKeystorePassword = configProps.getProperty( "sslKeystorePassword" );
+        sslContextFactory.setKeyStorePassword(sslKeystorePassword);
+        
+        String sslKeyManagerPassword = configProps.getProperty( "sslKeyManagerPassword" );
+        if( sslKeyManagerPassword != null && !sslKeyManagerPassword.isEmpty() )
+        {
+        	sslContextFactory.setKeyManagerPassword(sslKeyManagerPassword);
+        }
+        
+        String sslTruststorePath = configProps.getProperty( "sslTruststorePath" );
+        if( sslTruststorePath != null && !sslTruststorePath.isEmpty() )
+        {
+        	sslContextFactory.setTrustStorePath( homeDir + sslTruststorePath );
+        }
+        
+        String sslTruststorePassword = configProps.getProperty( "sslTruststorePassword" );
+        if( sslTruststorePassword != null && !sslTruststorePassword.isEmpty())
+        {
+        	sslContextFactory.setTrustStorePassword( sslTruststorePassword );
+        }
+        
+        sslContextFactory.setExcludeCipherSuites(
+                "SSL_RSA_WITH_DES_CBC_SHA",
+                "SSL_DHE_RSA_WITH_DES_CBC_SHA",
+                "SSL_DHE_DSS_WITH_DES_CBC_SHA",
+                "SSL_RSA_EXPORT_WITH_RC4_40_MD5",
+                "SSL_RSA_EXPORT_WITH_DES40_CBC_SHA",
+                "SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA",
+                "SSL_DHE_DSS_EXPORT_WITH_DES40_CBC_SHA");
+
+        // SSL HTTP Configuration
+        HttpConfiguration https_config = new HttpConfiguration(http_config);
+        https_config.addCustomizer(new SecureRequestCustomizer());
+
+        // SSL Connector
+        ServerConnector sslConnector = new ServerConnector(server,
+            new SslConnectionFactory(sslContextFactory,"http/1.1"),
+            new HttpConnectionFactory(https_config));
+        sslConnector.setPort(8443);
+        server.addConnector(sslConnector);        
+        
+
+        FilterHolder guiceFilter = new FilterHolder(injector.getInstance(GuiceFilter.class));
+        
+        
+        /** For JSP support.  Used only for testing and debugging for now.  This came come out
+         * once the real consumers for this service are in place
+         */
+        URL indexUri = Main.class.getResource(WEBROOT_INDEX);
+        if (indexUri == null)
+        {
+            throw new FileNotFoundException("Unable to find resource " + WEBROOT_INDEX);
+        }
+
+        // Points to wherever /webroot/ (the resource) is
+        URI baseUri = indexUri.toURI();        
+        
+        // Establish Scratch directory for the servlet context (used by JSP compilation)
+        File tempDir = new File(System.getProperty("java.io.tmpdir"));
+        File scratchDir = new File(tempDir.toString(),"embedded-jetty-jsp");
+
+        if (!scratchDir.exists())
+        {
+            if (!scratchDir.mkdirs())
+            {
+                throw new IOException("Unable to create scratch directory: " + scratchDir);
+            }
+        }        
+        
+        // Set JSP to use Standard JavaC always
+        System.setProperty("org.apache.jasper.compiler.disablejsr199","false");	        
+        
+        context.setAttribute("javax.servlet.context.tempdir",scratchDir);
+        context.setAttribute(InstanceManager.class.getName(), new SimpleInstanceManager());
+        
+        //Ensure the jsp engine is initialized correctly
+        JettyJasperInitializer sci = new JettyJasperInitializer();
+        ServletContainerInitializersStarter sciStarter = new ServletContainerInitializersStarter(context);
+        ContainerInitializer initializer = new ContainerInitializer(sci, null);
+        List<ContainerInitializer> initializers = new ArrayList<ContainerInitializer>();
+        initializers.add(initializer);
+
+        context.setAttribute("org.eclipse.jetty.containerInitializers", initializers);
+        context.addBean(sciStarter, true);        
+        
+        // Set Classloader of Context to be sane (needed for JSTL)
+        // JSP requires a non-System classloader, this simply wraps the
+        // embedded System classloader in a way that makes it suitable
+        // for JSP to use
+        // new URL( "file:///home/prhodes/.m2/repository/javax/servlet/jsp/javax.servlet.jsp-api/2.3.1/javax.servlet.jsp-api-2.3.1.jar" ) 
+        ClassLoader jspClassLoader = new URLClassLoader(new URL[] {}, Thread.currentThread().getContextClassLoader());
+        context.setClassLoader(jspClassLoader);
+
+        // Add JSP Servlet (must be named "jsp")
+        ServletHolder holderJsp = new ServletHolder("jsp",JspServlet.class);
+        holderJsp.setInitOrder(0);
+        holderJsp.setInitParameter("logVerbosityLevel","DEBUG");
+        holderJsp.setInitParameter("fork","false");
+        holderJsp.setInitParameter("xpoweredBy","false");
+        holderJsp.setInitParameter("compilerTargetVM","1.7");
+        holderJsp.setInitParameter("compilerSourceVM","1.7");
+        holderJsp.setInitParameter("keepgenerated","true");
+        context.addServlet(holderJsp,"*.jsp");
+        //context.addServlet(holderJsp,"*.jspf");
+        //context.addServlet(holderJsp,"*.jspx");
+
+        // Add Default Servlet (must be named "default")
+        ServletHolder holderDefault = new ServletHolder("default",DefaultServlet.class);
+        holderDefault.setInitParameter("resourceBase",baseUri.toASCIIString());
+        holderDefault.setInitParameter("dirAllowed","true");
+        context.addServlet(holderDefault,"/");         
+        
+        /** end "for JSP support */
+        
+
+        context.setResourceBase(baseUri.toASCIIString());
+        
+        context.setInitParameter("resteasy.guice.modules", "com.opensoc.dataservices.modules.guice.RestEasyModule");
+        context.setInitParameter("resteasy.servlet.mapping.prefix", "/rest");
+        
+        context.addEventListener(injector.getInstance(GuiceResteasyBootstrapServletContextListener.class));
+        context.addFilter(guiceFilter, "/*", EnumSet.allOf(DispatcherType.class));
+        
+        server.setHandler(context);
+        server.start();
+        
+        AlertsProcessingServer alertsServer = injector.getInstance(AlertsProcessingServer.class);
+        
+        alertsServer.startProcessing();
+        
+        server.join();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthToken.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthToken.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthToken.java
new file mode 100644
index 0000000..5d4fe68
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthToken.java
@@ -0,0 +1,187 @@
+package com.opensoc.dataservices.auth;
+
+import java.io.FileInputStream;
+import java.security.KeyStore;
+import java.util.Properties;
+
+import javax.crypto.Cipher;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AuthToken {
+
+	private static final Logger logger = LoggerFactory.getLogger( AuthToken.class );
+	
+	public static String generateToken( final Properties configProps ) throws Exception
+	{
+		
+		KeyStore ks = KeyStore.getInstance("JCEKS");
+		String keystoreFile = configProps.getProperty( "keystoreFile" );
+		logger.info( "keystoreFile: " + keystoreFile );
+		
+		String keystorePassword = configProps.getProperty( "keystorePassword" );
+		logger.info( "keystorePassword: " + keystorePassword );
+		
+		String keystoreAlias = configProps.getProperty( "authTokenAlias" );
+		logger.info( "keystoreAlias: " + keystoreAlias );
+		
+		FileInputStream fis = null;
+		try {
+			fis = new FileInputStream( keystoreFile );
+			ks.load(fis, keystorePassword.toCharArray() );
+		}
+		catch( Exception e )
+		{
+			logger.error( "Error opening keyfile:", e );
+			throw e;
+		}
+		finally {
+			fis.close();
+		}
+		
+		KeyStore.ProtectionParameter protParam =
+		        new KeyStore.PasswordProtection(keystorePassword.toCharArray());
+		KeyStore.SecretKeyEntry  secretKeyEntry = (KeyStore.SecretKeyEntry)ks.getEntry(keystoreAlias, protParam);
+		
+		SecretKey key = secretKeyEntry.getSecretKey();
+
+		
+		Cipher cipher = Cipher.getInstance("AES");
+		cipher.init(Cipher.ENCRYPT_MODE, key);		
+		String tokenString = "OpenSOC_AuthToken:" + System.currentTimeMillis();
+		
+		byte[] encryptedData = cipher.doFinal(tokenString.getBytes());	
+		
+		String base64Token = new String( Base64.encodeBase64(encryptedData) );
+		
+		// System.out.println( "base64Token: " + base64Token );
+		
+		return base64Token;
+		
+	}
+	
+	public static boolean validateToken( final Properties configProps, String authToken ) throws Exception
+	{
+		KeyStore ks = KeyStore.getInstance("JCEKS");
+		String keystoreFile = configProps.getProperty( "keystoreFile" );
+		String keystorePassword = configProps.getProperty( "keystorePassword" );
+		String keystoreAlias = configProps.getProperty( "authTokenAlias" );
+		long tokenMaxAgeInMilliseconds = Long.parseLong( configProps.getProperty( "authTokenMaxAge", "600000" ));
+		
+		FileInputStream fis = null;
+		try {
+			fis = new FileInputStream( keystoreFile );
+			ks.load(fis, keystorePassword.toCharArray() );
+		}
+		finally {
+			if( fis != null) {
+				fis.close();
+			}
+		}
+		
+		KeyStore.ProtectionParameter protParam =
+		        new KeyStore.PasswordProtection(keystorePassword.toCharArray());
+		KeyStore.SecretKeyEntry  secretKeyEntry = (KeyStore.SecretKeyEntry)ks.getEntry(keystoreAlias, protParam);
+		
+		SecretKey key = secretKeyEntry.getSecretKey();
+		
+		Cipher cipher = Cipher.getInstance("AES");
+		cipher.init(Cipher.DECRYPT_MODE, key);		
+		
+		byte[] encryptedBytes = Base64.decodeBase64(authToken);
+		
+		byte[] unencryptedBytes = cipher.doFinal(encryptedBytes);
+		String clearTextToken = new String( unencryptedBytes );
+		
+		System.out.println( "clearTextToken: " + clearTextToken );
+		String[] tokenParts = clearTextToken.split( ":" );
+		
+		if( tokenParts[0].equals( "OpenSOC_AuthToken" ))
+		{
+			long now = System.currentTimeMillis();
+			long tokenTime = Long.parseLong(tokenParts[1]);
+			
+			if( now > (tokenTime + tokenMaxAgeInMilliseconds ))
+			{
+				return false;
+			}
+			else
+			{
+				return true;
+			}
+		}
+		else
+		{
+			return false;
+		}
+		
+	}
+	
+	public static void main( String[] args ) throws Exception
+	{
+		
+    	Options options = new Options();
+    	
+    	options.addOption( "keystoreFile", true, "Keystore File" );
+    	options.addOption( "keystorePassword", true, "Keystore Password" );
+    	options.addOption( "authTokenAlias", true, "");
+    	
+    	CommandLineParser parser = new GnuParser();
+    	CommandLine cmd = parser.parse( options, args);
+		
+		
+		try
+		{
+			KeyStore ks = KeyStore.getInstance("JCEKS");
+
+			String keystorePassword = cmd.getOptionValue("keystorePassword");
+			String keystoreFile = cmd.getOptionValue("keystoreFile");
+			String authTokenAlias = cmd.getOptionValue("authTokenAlias");
+
+			ks.load(null, keystorePassword.toCharArray());
+
+			
+			// generate a key and store it in the keystore...
+			KeyGenerator keyGen = KeyGenerator.getInstance("AES");
+			SecretKey key = keyGen.generateKey();
+			
+			KeyStore.ProtectionParameter protParam =
+			        new KeyStore.PasswordProtection(keystorePassword.toCharArray());
+			
+			
+			KeyStore.SecretKeyEntry skEntry =
+			        new KeyStore.SecretKeyEntry(key);
+			
+			ks.setEntry(authTokenAlias, skEntry, protParam);
+			
+			java.io.FileOutputStream fos = null;
+		    try {
+		        
+		    	fos = new java.io.FileOutputStream(keystoreFile);
+		        ks.store(fos, keystorePassword.toCharArray());
+		    } 
+		    finally {
+		        
+		    	if (fos != null) {
+		            fos.close();
+		        }
+		    }
+			
+		    
+		    System.out.println( "done" );
+		    
+		}
+		catch( Exception e )
+		{
+			e.printStackTrace();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthTokenFilter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthTokenFilter.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthTokenFilter.java
new file mode 100644
index 0000000..103a4cc
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthTokenFilter.java
@@ -0,0 +1,15 @@
+package com.opensoc.dataservices.auth;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.ws.rs.NameBinding;
+
+@Target({ ElementType.TYPE, ElementType.METHOD })
+@Retention(value = RetentionPolicy.RUNTIME)
+@NameBinding
+public @interface AuthTokenFilter {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/CustomDomainADRealm.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/CustomDomainADRealm.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/CustomDomainADRealm.java
new file mode 100644
index 0000000..d7bffb2
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/CustomDomainADRealm.java
@@ -0,0 +1,34 @@
+package com.opensoc.dataservices.auth;
+
+import javax.naming.NamingException;
+
+import org.apache.shiro.authc.AuthenticationInfo;
+import org.apache.shiro.authc.AuthenticationToken;
+import org.apache.shiro.authc.UsernamePasswordToken;
+import org.apache.shiro.realm.activedirectory.ActiveDirectoryRealm;
+import org.apache.shiro.realm.ldap.LdapContextFactory;
+
+public class CustomDomainADRealm extends ActiveDirectoryRealm {
+
+	private String customDomain;
+	
+	public void setCustomDomain(String customDomain) {
+		this.customDomain = customDomain;
+	}
+	
+	public String getCustomDomain() {
+		return customDomain;
+	}
+	
+	@Override
+	protected AuthenticationInfo queryForAuthenticationInfo(
+			AuthenticationToken token, LdapContextFactory ldapContextFactory)
+			throws NamingException {
+	
+		UsernamePasswordToken upToken = (UsernamePasswordToken)token;
+		String userName = upToken.getUsername();
+		upToken.setUsername( userName + "@" + customDomain );
+		
+		return super.queryForAuthenticationInfo(token, ldapContextFactory);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/RestSecurityInterceptor.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/RestSecurityInterceptor.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/RestSecurityInterceptor.java
new file mode 100644
index 0000000..03e2c48
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/RestSecurityInterceptor.java
@@ -0,0 +1,57 @@
+package com.opensoc.dataservices.auth;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.core.Cookie;
+import javax.ws.rs.ext.Provider;
+
+import org.jboss.resteasy.core.Headers;
+import org.jboss.resteasy.core.ServerResponse;
+
+import com.google.inject.Inject;
+
+@AuthTokenFilter
+@Provider
+public class RestSecurityInterceptor implements javax.ws.rs.container.ContainerRequestFilter {
+
+	private static final ServerResponse ACCESS_DENIED = new ServerResponse("Access denied for this resource", 401, new Headers<Object>());;
+	
+	@Inject
+	private Properties configProps;
+	
+	@Override
+	public void filter(ContainerRequestContext requestContext) throws IOException {
+		
+		// get our token...		
+		Map<String, Cookie> cookies = requestContext.getCookies();
+		
+		Cookie authTokenCookie = cookies.get( "authToken" );
+		if( authTokenCookie == null )
+		{
+			requestContext.abortWith(ACCESS_DENIED );
+			return;			
+		}
+		
+		String authToken = authTokenCookie.getValue();
+		try {
+			
+			if( ! AuthToken.validateToken(configProps, authToken) )
+			{
+				requestContext.abortWith(ACCESS_DENIED );
+				return;	
+			}
+		} 
+		catch (Exception e) {
+
+			e.printStackTrace();
+			requestContext.abortWith(ACCESS_DENIED );
+			return;
+		}
+	
+		// if the token is good, just return...
+		
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/common/OpenSOCService.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/common/OpenSOCService.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/common/OpenSOCService.java
new file mode 100644
index 0000000..27b4cbf
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/common/OpenSOCService.java
@@ -0,0 +1,27 @@
+package com.opensoc.dataservices.common;
+
+public interface OpenSOCService {
+
+	//secure service that front elastic search or solr
+	//and the message broker
+	
+    public String identify();
+    public boolean init(String topicname);
+    public boolean login();
+    
+  //standing query operations
+    public boolean registerRulesFromFile();
+    public boolean registerRules();
+    public String viewRules();
+    public boolean editRules();
+    public boolean deleteRules();
+    
+  //register for writing to kafka topic
+    public boolean registerForAlertsTopic(String topicname);
+    
+  //client registers for alerts  
+    public String receiveAlertAll();
+    public String receiveAlertLast();
+    public boolean disconnectFromAlertsTopic(String topicname);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaClient.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaClient.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaClient.java
new file mode 100644
index 0000000..7874f19
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaClient.java
@@ -0,0 +1,83 @@
+package com.opensoc.dataservices.kafkaclient;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.dataservices.Main;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+public class KafkaClient 
+{
+	private static final Logger logger = LoggerFactory.getLogger( KafkaClient.class );
+	
+	private final ConsumerConnector consumer;
+    private final String topic;
+    private  ExecutorService executor;	
+	private RemoteEndpoint remote;
+       
+    public KafkaClient(String zooKeeper, String groupId, String topic, RemoteEndpoint remote) 
+    {
+        this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
+                createConsumerConfig(zooKeeper, groupId ));
+        
+        this.topic = topic;
+        this.remote = remote;
+	}
+
+	private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
+        Properties props = new Properties();
+        props.put("zookeeper.connect", a_zookeeper);
+        props.put("group.id", a_groupId);
+        props.put("zookeeper.session.timeout.ms", "1000");
+        props.put("zookeeper.sync.time.ms", "1000");
+        props.put("auto.commit.interval.ms", "1000");
+        props.put("auto.offset.reset", "smallest");
+        
+        return new ConsumerConfig(props);
+    }    
+    
+    public void shutdown() {
+    	
+    	logger.info( "Client shutdown() method invoked" );
+    	
+        if (consumer != null) { 
+        	consumer.shutdown();
+        }
+        
+        if (executor != null) { 
+        	executor.shutdown(); 
+        }
+    }    
+    
+    public void run(int a_numThreads) {
+        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+        topicCountMap.put(topic, new Integer(a_numThreads));
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+ 
+        logger.debug( "streams.size = " + streams.size() );
+        
+        // now launch all the threads
+        //
+        executor = Executors.newFixedThreadPool(a_numThreads);
+
+        // now create an object to consume the messages
+        //
+        int threadNumber = 0;
+        for (final KafkaStream stream : streams) {
+            executor.submit(new KafkaConsumer(this.remote, stream, threadNumber));
+            threadNumber++;
+        }
+    }   	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaConsumer.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaConsumer.java
new file mode 100644
index 0000000..0e01f1d
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaConsumer.java
@@ -0,0 +1,49 @@
+package com.opensoc.dataservices.kafkaclient;
+
+import java.io.IOException;
+
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.dataservices.Main;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+
+public class KafkaConsumer implements Runnable 
+{
+	private static final Logger logger = LoggerFactory.getLogger( KafkaConsumer.class );
+
+	private KafkaStream m_stream;
+    private int m_threadNumber;
+    private RemoteEndpoint remote;
+    
+    public KafkaConsumer( RemoteEndpoint remote, KafkaStream a_stream, int a_threadNumber) 
+    {
+        this.m_threadNumber = a_threadNumber;
+        this.m_stream = a_stream;
+        this.remote = remote;
+    }
+ 
+    public void run() 
+    {
+		logger.debug( "calling ConsumerTest.run()" );
+		ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
+    
+		while (it.hasNext())
+		{    
+			String message = new String(it.next().message());
+			try 
+			{
+				remote.sendString( message );
+			} 
+			catch (IOException e) 
+			{
+				e.printStackTrace();
+			}
+		}
+    	
+		logger.debug("Shutting down Thread: " + m_threadNumber);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaClient.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaClient.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaClient.java
new file mode 100644
index 0000000..a8a0305
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaClient.java
@@ -0,0 +1,101 @@
+package com.opensoc.dataservices.kafkaclient.poll;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PollingKafkaClient 
+{
+	private static final Logger logger = LoggerFactory.getLogger( PollingKafkaClient.class );
+	
+	private final ConsumerConnector consumer;
+    private final String topic;
+    private  ExecutorService executor;	
+       
+    public PollingKafkaClient(String zooKeeper, String groupId, String topic) 
+    {
+        this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
+                createConsumerConfig(zooKeeper, groupId ));
+        
+        this.topic = topic;
+	}
+
+	private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
+        Properties props = new Properties();
+        props.put("zookeeper.connect", a_zookeeper);
+        props.put("group.id", a_groupId);
+        props.put( "zookeeper.session.timeout.ms", "1000");
+        props.put( "zookeeper.sync.time.ms", "200");
+        props.put( "auto.commit.interval.ms", "200");
+        props.put( "auto.offset.reset", "smallest");
+        // props.put( "fetch.min.bytes",  "1" );
+        props.put( "consumer.timeout.ms", "1000" );
+        
+        return new ConsumerConfig(props);
+    }    
+    
+    public void shutdown() {
+    	
+    	logger.info( "Client shutdown() method invoked" );
+    	
+        if (consumer != null) { 
+        	consumer.shutdown();
+        }
+        
+        if (executor != null) { 
+        	executor.shutdown(); 
+        }
+    }    
+    
+    public void run(int a_numThreads) {
+
+    } 
+    
+    
+    public List fetchMessages()
+    {
+    	List<String> messages = new ArrayList<String>();
+    	
+        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+        topicCountMap.put(topic, new Integer(1));
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+ 
+        logger.debug( "streams.size = " + streams.size() );
+        
+        // now launch all the threads
+        //
+        executor = Executors.newFixedThreadPool(1);
+        
+        // now create an object to consume the messages
+        //
+        int threadNumber = 0;
+        CountDownLatch latch = new CountDownLatch( streams.size() );
+        
+        for (final KafkaStream stream : streams) {
+            executor.submit(new PollingKafkaConsumer(messages, stream, threadNumber, latch ));
+            threadNumber++;
+        }    	
+    	
+        try {
+        	latch.await();
+        } 
+        catch (InterruptedException e) {
+        	// TODO: handle
+        }
+        
+    	return messages;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaConsumer.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaConsumer.java
new file mode 100644
index 0000000..ee59b1f
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaConsumer.java
@@ -0,0 +1,52 @@
+package com.opensoc.dataservices.kafkaclient.poll;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PollingKafkaConsumer implements Runnable 
+{
+	private static final Logger logger = LoggerFactory.getLogger( PollingKafkaConsumer.class );
+
+	private KafkaStream m_stream;
+    private int m_threadNumber;
+    private List<String> messages;
+    private CountDownLatch latch;
+    
+    public PollingKafkaConsumer( List<String> messages, KafkaStream a_stream, int a_threadNumber, CountDownLatch latch ) 
+    {
+        this.m_threadNumber = a_threadNumber;
+        this.m_stream = a_stream;
+        this.messages = messages;
+        this.latch = latch;
+    }
+ 
+    public void run() 
+    {
+		logger.warn( "calling PollingKafkaConsumer.run()" );
+		ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
+    
+		try
+		{
+			while (it.hasNext())
+			{    
+				String message = new String(it.next().message());
+				logger.warn( "adding message: " + message);
+				messages.add(message);
+			}
+		}
+		catch( Exception e)
+		{
+			logger.error( "Exception waiting on Kafka...", e );
+		}
+		
+		latch.countDown();
+		
+		logger.warn("Shutting down Thread: " + m_threadNumber);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/AlertsServerModule.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/AlertsServerModule.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/AlertsServerModule.java
new file mode 100644
index 0000000..bd741be
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/AlertsServerModule.java
@@ -0,0 +1,36 @@
+package com.opensoc.dataservices.modules.guice;
+
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.opensoc.alerts.server.AlertsCacheReaper;
+import com.opensoc.alerts.server.AlertsProcessingServer;
+import com.opensoc.alerts.server.AlertsSearcher;
+
+public class AlertsServerModule extends AbstractModule {
+	
+	private static final Logger logger = LoggerFactory.getLogger( AlertsServerModule.class );	
+	
+	private Properties configProps;
+	
+	public AlertsServerModule( final Properties configProps ) {
+		this.configProps = configProps;
+	}
+	
+	@Override
+	protected void configure() {
+		bind( AlertsProcessingServer.class).in(Singleton.class);
+		bind( AlertsSearcher.class).in(Singleton.class);
+		bind( AlertsCacheReaper.class ).in(Singleton.class );
+	}
+	
+	@Provides Properties getConfigProps()
+	{
+		return configProps;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultServletModule.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultServletModule.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultServletModule.java
new file mode 100644
index 0000000..3e6d3b5
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultServletModule.java
@@ -0,0 +1,48 @@
+package com.opensoc.dataservices.modules.guice;
+
+import java.util.Properties;
+
+import org.apache.shiro.guice.web.ShiroWebModule;
+import org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Singleton;
+import com.google.inject.servlet.ServletModule;
+import com.opensoc.dataservices.kafkaclient.KafkaConsumer;
+import com.opensoc.dataservices.servlet.LoginServlet;
+import com.opensoc.dataservices.servlet.LogoutServlet;
+import com.opensoc.dataservices.websocket.KafkaMessageSenderServlet;
+import com.opensoc.dataservices.websocket.KafkaWebSocketCreator;
+
+public class DefaultServletModule extends ServletModule {
+    
+	private static final Logger logger = LoggerFactory.getLogger( DefaultServletModule.class );	
+	
+    private Properties configProps;
+
+    public DefaultServletModule( final Properties configProps ) {
+        this.configProps = configProps;
+    }	
+	
+	@Override
+    protected void configureServlets() {
+        
+		ShiroWebModule.bindGuiceFilter(binder());
+		
+		bind( KafkaWebSocketCreator.class ).in(Singleton.class);
+		
+        bind( HttpServletDispatcher.class ).in(Singleton.class);
+        serve( "/rest/*").with(HttpServletDispatcher.class);
+        
+        bind( KafkaMessageSenderServlet.class ).in(Singleton.class);
+		serve( "/ws/*").with(KafkaMessageSenderServlet.class );
+		
+		bind( LoginServlet.class).in(Singleton.class);
+		serve( "/login" ).with( LoginServlet.class );
+        
+		bind( LogoutServlet.class).in(Singleton.class);
+		serve( "/logout" ).with( LogoutServlet.class );
+		
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultShiroWebModule.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultShiroWebModule.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultShiroWebModule.java
new file mode 100644
index 0000000..bce4fce
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultShiroWebModule.java
@@ -0,0 +1,90 @@
+package com.opensoc.dataservices.modules.guice;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.servlet.ServletContext;
+
+import org.apache.shiro.guice.web.ShiroWebModule;
+import org.apache.shiro.web.filter.authc.LogoutFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Provides;
+import com.google.inject.name.Names;
+import com.opensoc.dataservices.auth.CustomDomainADRealm;
+
+public class DefaultShiroWebModule extends ShiroWebModule {
+    
+	private static final Logger logger = LoggerFactory.getLogger( DefaultShiroWebModule.class );
+	
+	private Properties configProps;
+	
+	public DefaultShiroWebModule(final ServletContext sc) {
+        super(sc);
+    }
+
+    public DefaultShiroWebModule(final Properties configProps, final ServletContext sc) {
+        super(sc);
+        this.configProps = configProps;
+    }    
+    
+    protected void configureShiroWeb() {
+        bindConstant().annotatedWith(Names.named("shiro.loginUrl")).to( "/login.jsp" );
+    	bindRealm().to(CustomDomainADRealm.class);
+    	bind( LogoutFilter.class);
+        
+        addFilterChain("/login", ANON);
+        addFilterChain("/logout", ANON);
+        addFilterChain("/withsocket.jsp", AUTHC );
+        addFilterChain("/withsocket2.jsp", ANON );
+    }
+    
+    @Provides 
+    @javax.inject.Singleton 
+    CustomDomainADRealm providesRealm()
+    {
+    	
+    	CustomDomainADRealm realm = new CustomDomainADRealm();
+    	
+    	String ldapUrl = configProps.getProperty("ldapUrl");
+    	logger.info( "got ldapurl from config: " + ldapUrl );
+    	realm.setUrl(ldapUrl);
+    	
+    	// String ldapAuthMechanism = configProps.getProperty( "ldapAuthMechanism", "simple" ).trim();
+    	// logger.info( "got ldapAuthMechanism from config: " + ldapAuthMechanism );
+    	
+    	
+    	String activeDirectorySystemUsername = configProps.getProperty( "activeDirectorySystemUsername" ).trim();
+    	logger.info( "got activeDirectorySystemUsername from config: " + activeDirectorySystemUsername );
+    	realm.setSystemUsername(activeDirectorySystemUsername);
+    	
+    	String activeDirectorySystemPassword = configProps.getProperty( "activeDirectorySystemPassword" ).trim();
+    	logger.info( "got activeDirectorySystemPassword from config: " + activeDirectorySystemPassword );
+    	realm.setSystemPassword(activeDirectorySystemPassword);
+
+    	String adDomain = configProps.getProperty( "adDomain" ).trim();
+    	realm.setCustomDomain( adDomain );
+    	
+    	String activeDirectoryBaseSearchDN = configProps.getProperty( "activeDirectoryBaseSearchDN" ).trim();
+    	logger.info( "got activeDirectoryBaseSearchDN from config: " + activeDirectoryBaseSearchDN );
+    	realm.setSearchBase( activeDirectoryBaseSearchDN );
+    	
+    	String groupRolesMapStr = configProps.getProperty( "groupRolesMap" );
+    	logger.info( "got groupRolesMapStr from config: " + groupRolesMapStr );
+    	
+    	String[] mappings = groupRolesMapStr.split( "\\|" );
+    	
+    	Map<String,String> groupRolesMap = new HashMap<String, String>();
+    	for( String mapping : mappings )
+    	{
+    		System.out.println( "mapping: " + mapping );
+    		String[] mappingParts = mapping.split(":");
+    		groupRolesMap.put( mappingParts[0], mappingParts[1]);
+    	}
+    	
+    	realm.setGroupRolesMap(groupRolesMap);
+    	return realm;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/RestEasyModule.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/RestEasyModule.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/RestEasyModule.java
new file mode 100644
index 0000000..14dfdb8
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/RestEasyModule.java
@@ -0,0 +1,23 @@
+package com.opensoc.dataservices.modules.guice;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.opensoc.dataservices.auth.RestSecurityInterceptor;
+import com.opensoc.dataservices.kafkaclient.KafkaConsumer;
+import com.opensoc.dataservices.rest.Index;
+
+public class RestEasyModule extends AbstractModule {
+	
+	private static final Logger logger = LoggerFactory.getLogger( RestEasyModule.class );
+	
+	@Override
+	protected void configure() {
+		
+		bind( Index.class );
+		bind( RestSecurityInterceptor.class );
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/ServiceModule.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/ServiceModule.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/ServiceModule.java
new file mode 100644
index 0000000..5271674
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/ServiceModule.java
@@ -0,0 +1,34 @@
+package com.opensoc.dataservices.modules.guice;
+
+import javax.inject.Singleton;
+
+import org.jboss.resteasy.plugins.guice.ext.RequestScopeModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Provides;
+import com.opensoc.dataservices.common.OpenSOCService;
+import com.opensoc.dataservices.kafkaclient.KafkaConsumer;
+import com.opensoc.services.alerts.ElasticSearch_KafkaAlertsService;
+import com.opensoc.services.alerts.Solr_KafkaAlertsService;
+
+public class ServiceModule extends RequestScopeModule {
+
+	private static final Logger logger = LoggerFactory.getLogger( ServiceModule.class );
+	
+    private String[] args;
+
+    public ServiceModule(String[] args) {
+        this.args = args;
+    }
+
+    @Provides
+    @Singleton
+    public OpenSOCService socservice() {
+        if (args.length > 0 && args[0].equals("ElasticSearch_KafkaAlertsService")) {
+            return new ElasticSearch_KafkaAlertsService();
+        } else {
+            return new Solr_KafkaAlertsService();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/Index.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/Index.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/Index.java
new file mode 100644
index 0000000..d9916dc
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/Index.java
@@ -0,0 +1,53 @@
+package com.opensoc.dataservices.rest;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Response;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.opensoc.dataservices.auth.AuthTokenFilter;
+import com.opensoc.dataservices.kafkaclient.poll.PollingKafkaClient;
+
+@Path("/")
+public class Index 
+{
+	private static final Logger logger = LoggerFactory.getLogger( Index.class );
+	
+	@Inject
+	private Properties configProps;
+	
+	@AuthTokenFilter
+	@GET
+	@Path("/alerts/{groupId}")
+	public Response getAlerts( @PathParam("groupId") String groupId ) 
+	{
+		String zooKeeperHost = configProps.getProperty( "kafkaZookeeperHost" );
+		logger.info( "kafkaZookeeperHost: " + zooKeeperHost );
+		String zooKeeperPort = configProps.getProperty( "kafkaZookeeperPort" );
+		logger.info( "kafkaZookeeperPort: " + zooKeeperPort );
+		
+		logger.warn( "got groupId from path as: " + groupId );
+		
+		PollingKafkaClient client = new PollingKafkaClient( zooKeeperHost + ":" + zooKeeperPort, groupId, "test"); 
+		List<String> messages = client.fetchMessages();
+		logger.warn( "found " + messages.size() + " messages in Kafka" );
+		
+		String respString1 = "<html><body><h2>Messages:</h2><ul>";
+		String respString2 = "</ul></body></html>";
+		
+		for( String msg : messages )
+		{
+			respString1 = respString1 + "<li>" + msg + "</li>";
+		}
+		
+		return Response.status(200).entity( respString1 + respString2 ).build();
+				
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/RestServices.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/RestServices.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/RestServices.java
new file mode 100644
index 0000000..650b6d4
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/RestServices.java
@@ -0,0 +1,35 @@
+package com.opensoc.dataservices.rest;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.ws.rs.core.Application;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.dataservices.kafkaclient.KafkaConsumer;
+
+public class RestServices extends Application 
+{
+	private static final Logger logger = LoggerFactory.getLogger( RestServices.class );
+	
+	private static Set services = new HashSet();
+
+	public RestServices() 
+	{
+		// initialize restful services
+		services.add(new Index());
+	}
+
+	@Override
+	public Set getSingletons() 
+	{
+		return services;
+	}
+
+	public static Set getServices() 
+	{
+		return services;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/servlet/LoginServlet.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/servlet/LoginServlet.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/servlet/LoginServlet.java
new file mode 100644
index 0000000..56d2c3e
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/servlet/LoginServlet.java
@@ -0,0 +1,113 @@
+package com.opensoc.dataservices.servlet;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.shiro.SecurityUtils;
+import org.apache.shiro.authc.AuthenticationException;
+import org.apache.shiro.authc.ExcessiveAttemptsException;
+import org.apache.shiro.authc.IncorrectCredentialsException;
+import org.apache.shiro.authc.LockedAccountException;
+import org.apache.shiro.authc.UnknownAccountException;
+import org.apache.shiro.authc.UsernamePasswordToken;
+import org.apache.shiro.subject.Subject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.opensoc.dataservices.auth.AuthToken;
+
+public class LoginServlet extends HttpServlet 
+{
+	private static final Logger logger = LoggerFactory.getLogger( LoginServlet.class );
+	
+	private static final long serialVersionUID = 1L;
+
+	@Inject
+	private Properties configProps;
+	
+	@Override
+	public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException 
+	{
+		doPost( req, resp );
+	}
+	
+	@Override
+	public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException 
+	{	
+		String username = req.getParameter("username" );
+		String password = req.getParameter("password" );
+		UsernamePasswordToken token = new UsernamePasswordToken(username, password);
+	
+		logger.info( "Doing login for user: " + username );
+		
+		Subject currentUser = SecurityUtils.getSubject();
+
+		try 
+		{
+		    currentUser.login(token);
+		} 
+		catch ( UnknownAccountException uae ) 
+		{
+			logger.warn( "Failing login with 401:", uae );
+			resp.sendError(405);
+			return;
+		} 
+		catch ( IncorrectCredentialsException ice ) 
+		{
+			logger.warn( "Failing login with 401:", ice );
+			resp.sendError(405);
+			return;
+		} 
+		catch ( LockedAccountException lae ) 
+		{
+			logger.warn( "Failing login with 401:", lae ); 
+			resp.sendError(401);
+			return;
+		} 
+		catch ( ExcessiveAttemptsException eae ) 
+		{
+			logger.warn( "Failing login with 401:", eae );
+			resp.sendError(401);
+			return;
+		}  
+		catch ( AuthenticationException ae ) 
+		{
+			logger.warn( "Failing login with 401:", ae );
+			resp.sendError(401);
+			return;
+		}
+		
+		
+		if( currentUser.hasRole("ShiroUsersRole") )
+		{
+			try
+			{
+			
+				Cookie authTokenCookie = new Cookie("authToken", AuthToken.generateToken(configProps));
+				resp.addCookie(authTokenCookie);
+				
+				// resp.setStatus(HttpServletResponse.SC_OK);
+				resp.sendRedirect( "/withsocket.jsp" );
+			}
+			catch( Exception e )
+			{
+				logger.error( "Failed creating authToken cookie.", e );
+				resp.sendError( 500 );
+				return;
+			}
+		}
+		else
+		{
+			logger.error("User does not have required role!");
+			resp.sendError(401);
+			return;
+		}
+	}	
+}
\ No newline at end of file