You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by om...@apache.org on 2015/12/08 07:38:11 UTC
[47/51] [partial] incubator-metron git commit: Initial import of code
from https://github.com/OpenSOC/opensoc at
ac0b00373f8f56dfae03a8109af5feb373ea598e.
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataLoads/pom.xml.versionsBackup
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataLoads/pom.xml.versionsBackup b/opensoc-streaming/OpenSOC-DataLoads/pom.xml.versionsBackup
new file mode 100644
index 0000000..a122057
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataLoads/pom.xml.versionsBackup
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?><!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.opensoc</groupId>
+ <artifactId>OpenSOC-Streaming</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>OpenSOC-DataLoads</artifactId>
+ <properties>
+ <opensoc.common.version>0.0.1-SNAPSHOT</opensoc.common.version>
+ <storm.version>0.9.1-incubating</storm.version>
+ <hbase.version>0.98.0-hadoop2</hbase.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>com.opensoc</groupId>
+ <artifactId>OpenSOC-Common</artifactId>
+ <version>${opensoc.common.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${storm.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src</sourceDirectory>
+ <resources>
+ <resource>
+ <directory>src</directory>
+ <excludes>
+ <exclude>**/*.java</exclude>
+ </excludes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>classworlds:classworlds</exclude>
+ <exclude>junit:junit</exclude>
+ <exclude>jmock:*</exclude>
+ <exclude>*:xml-apis</exclude>
+ <exclude>org.apache.maven:lib:tests</exclude>
+ <exclude>log4j:log4j:jar:</exclude>
+ <exclude>*:hbase:*</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataLoads/src/com/opensoc/dataloads/cif/HBaseTableLoad.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataLoads/src/com/opensoc/dataloads/cif/HBaseTableLoad.java b/opensoc-streaming/OpenSOC-DataLoads/src/com/opensoc/dataloads/cif/HBaseTableLoad.java
new file mode 100644
index 0000000..cdf0541
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataLoads/src/com/opensoc/dataloads/cif/HBaseTableLoad.java
@@ -0,0 +1,122 @@
+package com.opensoc.dataloads.cif;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.BufferedInputStream;
+
+public class HBaseTableLoad {
+
+ private static Configuration conf = null;
+ private final static String hbaseTable = "cif_table";
+ /**
+ * Initialization
+ */
+ static {
+ conf = HBaseConfiguration.create();
+ }
+
+ public static void main(String[] args) {
+
+ LoadDirHBase(args[0]);
+
+ }
+
+ public static void LoadDirHBase(String dirName) {
+ System.out.println("Working on:" + dirName);
+ File folder = new File(dirName);
+ File[] listOfFiles = folder.listFiles();
+
+ for (int i = 0; i < listOfFiles.length; i++) {
+ File file = listOfFiles[i];
+
+ if (file.isFile() && file.getName().endsWith(".gz")) {
+
+ // e.g. folder name is infrastructure_botnet. Col Qualifier is
+ // botnet and col_family is infrastructure
+
+ String col_family = folder.getName().split("_")[0];
+ String col_qualifier = folder.getName().split("_")[1];
+
+ // Open gz file
+ try {
+ InputStream input = new BufferedInputStream(
+ new GZIPInputStream(new FileInputStream(file)));
+
+ HBaseBulkPut(input, col_family, col_qualifier);
+
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (ParseException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ } else if (file.isDirectory()) // if sub-directory then call the
+ // function recursively
+ LoadDirHBase(file.getAbsolutePath());
+ }
+ }
+
+ /**
+ * @param input
+ * @param hbaseTable
+ * @param col_family
+ * @throws IOException
+ * @throws ParseException
+ *
+ *
+ * Inserts all json records picked up from the inputStream
+ */
+ public static void HBaseBulkPut(InputStream input, String col_family,
+ String col_qualifier) throws IOException, ParseException {
+
+ HTable table = new HTable(conf, hbaseTable);
+ JSONParser parser = new JSONParser();
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(input));
+ String jsonString;
+ List<Put> allputs = new ArrayList<Put>();
+ Map json;
+
+ while ((jsonString = br.readLine()) != null) {
+
+ try {
+
+ json = (Map) parser.parse(jsonString);
+ } catch (ParseException e) {
+ //System.out.println("Unable to Parse: " +jsonString);
+ continue;
+ }
+ // Iterator iter = json.entrySet().iterator();
+
+ // Get Address - either IP/domain or email and make that the Key
+ Put put = new Put(Bytes.toBytes((String) json.get("address")));
+
+ // We are just adding a "Y" flag to mark this address
+ put.add(Bytes.toBytes(col_family), Bytes.toBytes(col_qualifier),
+ Bytes.toBytes("Y"));
+
+ allputs.add(put);
+ }
+ table.put(allputs);
+ System.out.println("---------------Values------------------"
+ + hbaseTable);
+ table.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataLoads/src/hbase-site.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataLoads/src/hbase-site.xml b/opensoc-streaming/OpenSOC-DataLoads/src/hbase-site.xml
new file mode 100644
index 0000000..a73469d
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataLoads/src/hbase-site.xml
@@ -0,0 +1,100 @@
+<!--Tue Feb 11 02:34:08 2014 -->
+<configuration>
+
+ <property>
+ <name>hbase.regionserver.global.memstore.lowerLimit</name>
+ <value>0.38</value>
+ </property>
+ <property>
+ <name>zookeeper.session.timeout</name>
+ <value>30000</value>
+ </property>
+
+ <property>
+ <name>hbase.security.authorization</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>hbase.cluster.distributed</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>hbase.hstore.flush.retries.number</name>
+ <value>120</value>
+ </property>
+ <property>
+ <name>hbase.hregion.memstore.block.multiplier</name>
+ <value>4</value>
+ </property>
+ <property>
+ <name>hbase.hstore.blockingStoreFiles</name>
+ <value>200</value>
+ </property>
+ <property>
+ <name>hbase.defaults.for.version.skip</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hbase.regionserver.global.memstore.upperLimit</name>
+ <value>0.4</value>
+ </property>
+ <property>
+ <name>hbase.hregion.memstore.mslab.enabled</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hbase.client.keyvalue.maxsize</name>
+ <value>10485760</value>
+ </property>
+ <property>
+ <name>hbase.superuser</name>
+ <value>hbase</value>
+ </property>
+ <property>
+ <name>hfile.block.cache.size</name>
+ <value>0.40</value>
+ </property>
+ <property>
+ <name>zookeeper.znode.parent</name>
+ <value>/hbase-unsecure</value>
+ </property>
+ <property>
+ <name>hbase.hregion.max.filesize</name>
+ <value>10737418240</value>
+ </property>
+ <property>
+ <name>hbase.zookeeper.property.clientPort</name>
+ <value>2181</value>
+ </property>
+ <property>
+ <name>hbase.security.authentication</name>
+ <value>simple</value>
+ </property>
+ <property>
+ <name>hbase.client.scanner.caching</name>
+ <value>100</value>
+ </property>
+ <property>
+ <name>hbase.hregion.memstore.flush.size</name>
+ <value>134217728</value>
+ </property>
+ <property>
+ <name>hbase.hregion.majorcompaction</name>
+ <value>86400000</value>
+ </property>
+ <property>
+ <name>hbase.zookeeper.property.clientPort</name>
+ <value>2181</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.quorum</name>
+ <value>zkpr1</value>
+ </property>
+
+ <property>
+ <name>hbase.client.write.buffer</name>
+ <value>500000000</value>
+ </property>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/conf/config.properties
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/conf/config.properties b/opensoc-streaming/OpenSOC-DataServices/conf/config.properties
new file mode 100644
index 0000000..7c060fb
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/conf/config.properties
@@ -0,0 +1,21 @@
+# sample config file for the Alerts/Proxy service. Fill in
+# the settings below with values appropriate to your environment.
+
+alertsServiceImpl=elasticsearch
+ldapUrl=ldap://ec2-54-88-217-194.compute-1.amazonaws.com
+initialDelayTime=120
+searchIntervalTime=30
+elasticSearchHostName=localhost
+elasticSearchHostPort=9300
+kafkaBrokerHostName=ec2-example.compute-1.amazonaws.com
+kafkaBrokerHostPort=9092
+kafkaTopicName=test
+kafkaZookeeperHost=ec2-example.compute-1.amazonaws.com
+kafkaZookeeperPort=2181
+kafkaGroupId=abc123
+keystoreFile=./keystore
+keystorePassword=password
+authTokenAlias=authTokenKey
+authTokenMaxAge=10000
+alerts.cache.expiration.interval=360000
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/pom.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/pom.xml b/opensoc-streaming/OpenSOC-DataServices/pom.xml
new file mode 100644
index 0000000..56b7372
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/pom.xml
@@ -0,0 +1,278 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.opensoc</groupId>
+ <artifactId>OpenSOC-DataServices</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <properties>
+ <slf4j.version>1.6.4</slf4j.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>1.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.6</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>9.2.1.v20140609</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>9.2.1.v20140609</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>apache-jsp</artifactId>
+ <version>9.2.1.v20140609</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-jsp</artifactId>
+ <version>9.2.1.v20140609</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-webapp</artifactId>
+ <version>9.2.1.v20140609</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-annotations</artifactId>
+ <version>9.2.1.v20140609</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>javax-websocket-server-impl</artifactId>
+ <version>9.2.1.v20140609</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-server</artifactId>
+ <version>9.2.1.v20140609</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-common</artifactId>
+ <version>9.2.1.v20140609</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-api</artifactId>
+ <version>9.2.1.v20140609</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-parent</artifactId>
+ <version>9.2.1.v20140609</version>
+ <type>pom</type>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.2.4</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ <version>3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-servlet</artifactId>
+ <version>3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.annotation</groupId>
+ <artifactId>jsr250-api</artifactId>
+ <version>1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>resteasy-jaxrs</artifactId>
+ <version>3.0.7.Final</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>httpclient</artifactId>
+ <groupId>org.apache.httpcomponents</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jboss-annotations-api_1.1_spec</artifactId>
+ <groupId>org.jboss.spec.javax.annotation</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jcip-annotations</artifactId>
+ <groupId>net.jcip</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>resteasy-guice</artifactId>
+ <version>3.0.7.Final</version>
+ </dependency>
+
+ <!-- Apache Shiro -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.6.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shiro</groupId>
+ <artifactId>shiro-core</artifactId>
+ <version>1.2.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shiro</groupId>
+ <artifactId>shiro-web</artifactId>
+ <version>1.2.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shiro</groupId>
+ <artifactId>shiro-guice</artifactId>
+ <version>1.2.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <!-- Required in the sample apps only for 3rd-party libraries that expect
+ to call the commons logging APIs -->
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <!-- Kafka -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>0.8.1.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- TESTS -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>resteasy-client</artifactId>
+ <version>3.0.7.Final</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <artifactId>httpclient</artifactId>
+ <groupId>org.apache.httpcomponents</groupId>
+ <type>jar</type>
+ <version>4.3.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>1.9.5</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <inherited>true</inherited>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <archive>
+ <manifest>
+ <addClasspath>true</addClasspath>
+ <mainClass>com.opensoc.dataservices.Main</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>com.opensoc.dataservices.Main</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <configuration>
+ <mainClass>com.opensoc.dataservices.Main</mainClass>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsCacheReaper.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsCacheReaper.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsCacheReaper.java
new file mode 100644
index 0000000..d150833
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsCacheReaper.java
@@ -0,0 +1,45 @@
+package com.opensoc.alerts.server;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.inject.Inject;
+
+public class AlertsCacheReaper implements Runnable {
+
+ private AlertsSearcher searcher;
+
+ @Inject
+ private Properties configProps;
+
+ public void setSearcher(AlertsSearcher searcher) {
+ this.searcher = searcher;
+ }
+
+ @Override
+ public void run() {
+
+ long expireAlertsCacheInterval = Long.parseLong( configProps.getProperty( "alerts.cache.expiration.interval", "360000" ) );
+ long timeNow = System.currentTimeMillis();
+
+ long cutOffTime = timeNow - expireAlertsCacheInterval;
+
+ List<String> forRemoval = new ArrayList<String>();
+
+ for( Map.Entry<String, AlertsFilterCacheEntry> entry : searcher.alertsFilterCache.entrySet() )
+ {
+ // if entry was saved more than X timeunits ago, remove it
+ if( entry.getValue().storedAtTime < cutOffTime )
+ {
+ forRemoval.add(entry.getKey());
+ }
+ }
+
+ for( String key : forRemoval )
+ {
+ searcher.alertsFilterCache.remove(key);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsFilterCacheEntry.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsFilterCacheEntry.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsFilterCacheEntry.java
new file mode 100644
index 0000000..7a0f687
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsFilterCacheEntry.java
@@ -0,0 +1,17 @@
+package com.opensoc.alerts.server;
+
+public class AlertsFilterCacheEntry {
+
+
+ public String sourceData;
+ public long storedAtTime;
+
+
+ public AlertsFilterCacheEntry(String sourceData, long timeNow) {
+ this.sourceData = sourceData;
+ this.storedAtTime = timeNow;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsProcessingServer.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsProcessingServer.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsProcessingServer.java
new file mode 100644
index 0000000..3cf6246
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsProcessingServer.java
@@ -0,0 +1,44 @@
+package com.opensoc.alerts.server;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.opensoc.dataservices.Main;
+
+public class AlertsProcessingServer {
+
+ private static final Logger logger = LoggerFactory.getLogger( AlertsProcessingServer.class );
+
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
+ @Inject
+ private AlertsSearcher searcher;
+ @Inject
+ private AlertsCacheReaper reaper;
+ @Inject
+ private Properties configProps;
+
+ public void startProcessing() {
+
+ logger.debug( "startProcessing() invoked" );
+
+ int initialDelayTime = Integer.parseInt( configProps.getProperty( "searchInitialDelayTime", "30" ) );
+ int searchIntervalTime = Integer.parseInt( configProps.getProperty( "searchIntervalTime", "30" ) );
+
+ reaper.setSearcher(searcher);
+
+ final ScheduledFuture<?> alertsSearcherHandle =
+ scheduler.scheduleAtFixedRate( searcher, initialDelayTime, searchIntervalTime, SECONDS );
+
+ scheduler.scheduleAtFixedRate(reaper, 120, 380, SECONDS);
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsSearcher.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsSearcher.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsSearcher.java
new file mode 100644
index 0000000..15db704
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/alerts/server/AlertsSearcher.java
@@ -0,0 +1,237 @@
+package com.opensoc.alerts.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+
+public class AlertsSearcher implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger( AlertsSearcher.class );
+
+ @Inject
+ private Properties configProps;
+
+ // TODO: inject Searcher module for either ElasticSearch or Solr...
+ // TODO: inject OpenSocServiceFactory here
+
+ public final Map<String, AlertsFilterCacheEntry> alertsFilterCache = new HashMap<String, AlertsFilterCacheEntry>();
+ MessageDigest md;
+
+ public AlertsSearcher() throws NoSuchAlgorithmException
+ {
+ md = MessageDigest.getInstance("SHA-256");
+
+ }
+
+ @Override
+ public void run() {
+
+ try
+ {
+ logger.debug( "Doing Elastic Search search..." );
+
+ long currentSearchTime = System.currentTimeMillis();
+ long lastSearchTime = 0L;
+
+ // look for a marker that tells us the last time we ran...
+ String homeDir = configProps.getProperty("homeDir");
+ if( homeDir.endsWith( "/" )) {
+ homeDir = homeDir.substring(0, homeDir.length()-1);
+ }
+
+ logger.info( "using homeDir = " + homeDir );
+
+ File searcherStateFile = new File( homeDir + "/searcherState.properties" );
+ if( searcherStateFile.exists() )
+ {
+ logger.info( "found existing searcherState.properties file" );
+ FileInputStream fis = null;
+ try {
+ fis = new FileInputStream( searcherStateFile );
+ Properties searcherState = new Properties();
+ searcherState.load(fis);
+ lastSearchTime = Long.parseLong( searcherState.getProperty("lastSearchTime"));
+ }
+ catch( FileNotFoundException e ) {
+ logger.error( "Error locating lastSearchTime value from state file", e );
+
+ } catch (IOException e) {
+ logger.error( "Error locating lastSearchTime value from state file", e );
+ }
+ finally
+ {
+ try {
+ fis.close();
+ } catch (IOException e) {
+ logger.error( "Probably ignorable error closing file stream: ", e );
+ }
+ }
+ }
+ else
+ {
+ // nothing to do here. We'll write out our lastSearchTime at the end
+ logger.info( "No existing searcherState.properties found" );
+ }
+
+ // search for alerts newer than "lastSearchTime"
+ Settings settings = ImmutableSettings.settingsBuilder()
+ .put("client.transport.sniff", true).build();
+ // .put("cluster.name", "elasticsearch").build();
+
+ Client client = null;
+ try
+ {
+ logger.info( "initializing elasticsearch client" );
+
+ String elasticSearchHostName = configProps.getProperty( "elasticSearchHostName", "localhost" );
+ int elasticSearchHostPort = Integer.parseInt(configProps.getProperty( "elasticSearchHostPort", "9300" ) );
+ client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(elasticSearchHostName, elasticSearchHostPort));
+
+ logger.info( "lastSearchTime: " + lastSearchTime );
+
+ String alertsIndexes = configProps.getProperty( "alertsIndexes", "alerts" );
+ String[] alertsIndexArray = alertsIndexes.split(",");
+
+ String alertsTypes = configProps.getProperty( "alertsTypes", "" );
+ String[] alertsTypesArray = alertsTypes.split( ",");
+
+ String alertsQueryFieldName = configProps.getProperty( "alertQueryFieldName", "alert.source" );
+ String alertsQueryFieldValue = configProps.getProperty( "alertsQueryFieldValue", "*" );
+
+ logger.info( "alertsIndexes: " + alertsIndexes );
+
+ String[] foo = new String[1];
+
+ SearchResponse response = client.prepareSearch( alertsIndexArray )
+ .setTypes( alertsTypesArray )
+ .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
+ .addField("_source")
+ .setQuery( QueryBuilders.boolQuery().must( QueryBuilders.wildcardQuery( alertsQueryFieldName, alertsQueryFieldValue ) )
+ .must( QueryBuilders.rangeQuery("message.timestamp").from(lastSearchTime).to(System.currentTimeMillis()).includeLower(true).includeUpper(false)))
+ .execute()
+ .actionGet();
+
+ SearchHits hits = response.getHits();
+ logger.debug( "Total hits: " + hits.getTotalHits());
+
+
+ // for all hits, put the alert onto the Kafka topic.
+ for( SearchHit hit : hits )
+ {
+ // calculate hash for this hit...
+ String sourceData = hit.getSourceAsString();
+ String hash = new String( md.digest(sourceData.getBytes()));
+
+ if( alertsFilterCache.containsKey(hash))
+ {
+ logger.warn( "We have already seen this Alert, so skipping..." );
+ continue;
+ }
+ else
+ {
+ long timeNow = System.currentTimeMillis();
+ AlertsFilterCacheEntry cacheEntry = new AlertsFilterCacheEntry( sourceData, timeNow );
+ alertsFilterCache.put(hash, cacheEntry);
+ }
+
+ doSenderWork(hit);
+ }
+
+ }
+ finally
+ {
+ if( client != null )
+ {
+ client.close();
+ }
+ }
+
+ // record the time we just searched
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream( searcherStateFile );
+ Properties searcherState = new Properties();
+ searcherState.setProperty( "lastSearchTime", Long.toString(currentSearchTime));
+ searcherState.store(fos, "");
+
+ }
+ catch( FileNotFoundException e ) {
+ logger.error( "Error saving lastSearchTime: ", e );
+ } catch (IOException e) {
+ logger.error( "Error saving lastSearchTime: ", e );
+ }
+ finally {
+
+ try {
+ fos.close();
+ }
+ catch (IOException e) {
+ logger.error( "Probably ignorable error closing file stream: ", e );
+ }
+ }
+
+ logger.info( "Done with ElasticSearch search... " );
+ }
+ catch( Exception e )
+ {
+ logger.error( "Unexpected error while searching ElasticSearch index:", e );
+ }
+ }
+
+ private void doSenderWork( SearchHit hit )
+ {
+ String kafkaBrokerHostName = configProps.getProperty("kafkaBrokerHostName", "localhost" );
+ String kafkaBrokerHostPort = configProps.getProperty("kafkaBrokerHostPort", "9092" );
+ String kafkaTopicName = configProps.getProperty("kafkaTopicName", "test" );
+
+ logger.debug( "kafkaBrokerHostName: " + kafkaBrokerHostName );
+ logger.debug( "kafkaBrokerHostPort: " + kafkaBrokerHostPort );
+ logger.debug( "kafkaTopicName: " + kafkaTopicName );
+
+ String sourceData = hit.getSourceAsString();
+
+ logger.debug( "Source Data: " + sourceData );
+ Properties props = new Properties();
+
+ props.put("metadata.broker.list", kafkaBrokerHostName + ":" + kafkaBrokerHostPort );
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ // props.put("partitioner.class", "example.producer.SimplePartitioner");
+ props.put("request.required.acks", "1");
+
+ ProducerConfig config = new ProducerConfig(props);
+
+ Producer<String, String> producer = new Producer<String, String>(config);
+
+ KeyedMessage<String, String> data = new KeyedMessage<String, String>(kafkaTopicName, "", sourceData );
+
+ producer.send(data);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/Main.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/Main.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/Main.java
new file mode 100644
index 0000000..f34dc50
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/Main.java
@@ -0,0 +1,288 @@
+package com.opensoc.dataservices;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Properties;
+
+import javax.servlet.DispatcherType;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.jasper.servlet.JspServlet;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.shiro.web.env.EnvironmentLoaderListener;
+import org.apache.tomcat.InstanceManager;
+import org.apache.tomcat.SimpleInstanceManager;
+import org.eclipse.jetty.annotations.ServletContainerInitializersStarter;
+import org.eclipse.jetty.apache.jsp.JettyJasperInitializer;
+import org.eclipse.jetty.plus.annotation.ContainerInitializer;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.webapp.WebAppContext;
+import org.jboss.resteasy.plugins.guice.GuiceResteasyBootstrapServletContextListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.servlet.GuiceFilter;
+import com.opensoc.alerts.server.AlertsProcessingServer;
+import com.opensoc.dataservices.modules.guice.AlertsServerModule;
+import com.opensoc.dataservices.modules.guice.DefaultServletModule;
+import com.opensoc.dataservices.modules.guice.DefaultShiroWebModule;
+
+public class Main {
+
+ static int port = 9091;
+
+ private static final String WEBROOT_INDEX = "/webroot/";
+
+ private static final Logger logger = LoggerFactory.getLogger( Main.class );
+
+ public static void main(String[] args) throws Exception {
+
+
+ Options options = new Options();
+
+ options.addOption( "homeDir", true, "Home directory for the service" );
+
+ CommandLineParser parser = new GnuParser();
+ CommandLine cmd = parser.parse( options, args);
+
+ Properties configProps = new Properties();
+
+ String homeDir = cmd.getOptionValue("homeDir");
+
+ if( homeDir.endsWith( "/" ))
+ {
+ homeDir = homeDir.substring(0, homeDir.length()-1);
+ }
+
+
+ DOMConfigurator.configure( homeDir + "/log4j.xml" );
+
+ logger.warn( "DataServices Server starting..." );
+
+
+ File configFile = new File( homeDir + "/config.properties" );
+ FileReader configFileReader = new FileReader( configFile );
+ try
+ {
+ configProps.load(configFileReader);
+
+ Option[] cmdOptions = cmd.getOptions();
+ for( Option opt : cmdOptions )
+ {
+ String argName = opt.getOpt();
+ String argValue = opt.getValue();
+
+ configProps.put(argName, argValue);
+ }
+
+ }
+ finally
+ {
+ if( configFileReader != null )
+ {
+ configFileReader.close();
+ }
+ }
+
+ WebAppContext context = new WebAppContext();
+
+ Injector injector = Guice.createInjector( new DefaultServletModule(configProps),
+ new AlertsServerModule(configProps),
+ new DefaultShiroWebModule(configProps, context.getServletContext()),
+ new AbstractModule() {
+
+ @Override
+ protected void configure() {
+ binder().requireExplicitBindings();
+ bind(GuiceFilter.class);
+ bind( GuiceResteasyBootstrapServletContextListener.class );
+ bind( EnvironmentLoaderListener.class );
+
+ }
+ }
+ );
+
+
+ injector.getAllBindings();
+ injector.createChildInjector().getAllBindings();
+
+ Server server = new Server(port);
+
+ /***************************************************
+ *************** enable SSL ************************
+ ***************************************************/
+
+ // HTTP Configuration
+ HttpConfiguration http_config = new HttpConfiguration();
+ http_config.setSecureScheme("https");
+ http_config.setSecurePort(8443);
+ http_config.setOutputBufferSize(32768);
+ http_config.setRequestHeaderSize(8192);
+ http_config.setResponseHeaderSize(8192);
+ http_config.setSendServerVersion(true);
+ http_config.setSendDateHeader(false);
+ // httpConfig.addCustomizer(new ForwardedRequestCustomizer())
+ // SSL Context Factory
+ SslContextFactory sslContextFactory = new SslContextFactory();
+
+ String sslKeystorePath = configProps.getProperty( "sslKeystorePath", "/keystore" );
+ logger.debug( "sslKeystorePath: " + sslKeystorePath );
+ sslContextFactory.setKeyStorePath( homeDir + sslKeystorePath );
+
+ String sslKeystorePassword = configProps.getProperty( "sslKeystorePassword" );
+ sslContextFactory.setKeyStorePassword(sslKeystorePassword);
+
+ String sslKeyManagerPassword = configProps.getProperty( "sslKeyManagerPassword" );
+ if( sslKeyManagerPassword != null && !sslKeyManagerPassword.isEmpty() )
+ {
+ sslContextFactory.setKeyManagerPassword(sslKeyManagerPassword);
+ }
+
+ String sslTruststorePath = configProps.getProperty( "sslTruststorePath" );
+ if( sslTruststorePath != null && !sslTruststorePath.isEmpty() )
+ {
+ sslContextFactory.setTrustStorePath( homeDir + sslTruststorePath );
+ }
+
+ String sslTruststorePassword = configProps.getProperty( "sslTruststorePassword" );
+ if( sslTruststorePassword != null && !sslTruststorePassword.isEmpty())
+ {
+ sslContextFactory.setTrustStorePassword( sslTruststorePassword );
+ }
+
+ sslContextFactory.setExcludeCipherSuites(
+ "SSL_RSA_WITH_DES_CBC_SHA",
+ "SSL_DHE_RSA_WITH_DES_CBC_SHA",
+ "SSL_DHE_DSS_WITH_DES_CBC_SHA",
+ "SSL_RSA_EXPORT_WITH_RC4_40_MD5",
+ "SSL_RSA_EXPORT_WITH_DES40_CBC_SHA",
+ "SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA",
+ "SSL_DHE_DSS_EXPORT_WITH_DES40_CBC_SHA");
+
+ // SSL HTTP Configuration
+ HttpConfiguration https_config = new HttpConfiguration(http_config);
+ https_config.addCustomizer(new SecureRequestCustomizer());
+
+ // SSL Connector
+ ServerConnector sslConnector = new ServerConnector(server,
+ new SslConnectionFactory(sslContextFactory,"http/1.1"),
+ new HttpConnectionFactory(https_config));
+ sslConnector.setPort(8443);
+ server.addConnector(sslConnector);
+
+
+ FilterHolder guiceFilter = new FilterHolder(injector.getInstance(GuiceFilter.class));
+
+
+ /** For JSP support. Used only for testing and debugging for now. This came come out
+ * once the real consumers for this service are in place
+ */
+ URL indexUri = Main.class.getResource(WEBROOT_INDEX);
+ if (indexUri == null)
+ {
+ throw new FileNotFoundException("Unable to find resource " + WEBROOT_INDEX);
+ }
+
+ // Points to wherever /webroot/ (the resource) is
+ URI baseUri = indexUri.toURI();
+
+ // Establish Scratch directory for the servlet context (used by JSP compilation)
+ File tempDir = new File(System.getProperty("java.io.tmpdir"));
+ File scratchDir = new File(tempDir.toString(),"embedded-jetty-jsp");
+
+ if (!scratchDir.exists())
+ {
+ if (!scratchDir.mkdirs())
+ {
+ throw new IOException("Unable to create scratch directory: " + scratchDir);
+ }
+ }
+
+ // Set JSP to use Standard JavaC always
+ System.setProperty("org.apache.jasper.compiler.disablejsr199","false");
+
+ context.setAttribute("javax.servlet.context.tempdir",scratchDir);
+ context.setAttribute(InstanceManager.class.getName(), new SimpleInstanceManager());
+
+ //Ensure the jsp engine is initialized correctly
+ JettyJasperInitializer sci = new JettyJasperInitializer();
+ ServletContainerInitializersStarter sciStarter = new ServletContainerInitializersStarter(context);
+ ContainerInitializer initializer = new ContainerInitializer(sci, null);
+ List<ContainerInitializer> initializers = new ArrayList<ContainerInitializer>();
+ initializers.add(initializer);
+
+ context.setAttribute("org.eclipse.jetty.containerInitializers", initializers);
+ context.addBean(sciStarter, true);
+
+ // Set Classloader of Context to be sane (needed for JSTL)
+ // JSP requires a non-System classloader, this simply wraps the
+ // embedded System classloader in a way that makes it suitable
+ // for JSP to use
+ // new URL( "file:///home/prhodes/.m2/repository/javax/servlet/jsp/javax.servlet.jsp-api/2.3.1/javax.servlet.jsp-api-2.3.1.jar" )
+ ClassLoader jspClassLoader = new URLClassLoader(new URL[] {}, Thread.currentThread().getContextClassLoader());
+ context.setClassLoader(jspClassLoader);
+
+ // Add JSP Servlet (must be named "jsp")
+ ServletHolder holderJsp = new ServletHolder("jsp",JspServlet.class);
+ holderJsp.setInitOrder(0);
+ holderJsp.setInitParameter("logVerbosityLevel","DEBUG");
+ holderJsp.setInitParameter("fork","false");
+ holderJsp.setInitParameter("xpoweredBy","false");
+ holderJsp.setInitParameter("compilerTargetVM","1.7");
+ holderJsp.setInitParameter("compilerSourceVM","1.7");
+ holderJsp.setInitParameter("keepgenerated","true");
+ context.addServlet(holderJsp,"*.jsp");
+ //context.addServlet(holderJsp,"*.jspf");
+ //context.addServlet(holderJsp,"*.jspx");
+
+ // Add Default Servlet (must be named "default")
+ ServletHolder holderDefault = new ServletHolder("default",DefaultServlet.class);
+ holderDefault.setInitParameter("resourceBase",baseUri.toASCIIString());
+ holderDefault.setInitParameter("dirAllowed","true");
+ context.addServlet(holderDefault,"/");
+
+ /** end "for JSP support */
+
+
+ context.setResourceBase(baseUri.toASCIIString());
+
+ context.setInitParameter("resteasy.guice.modules", "com.opensoc.dataservices.modules.guice.RestEasyModule");
+ context.setInitParameter("resteasy.servlet.mapping.prefix", "/rest");
+
+ context.addEventListener(injector.getInstance(GuiceResteasyBootstrapServletContextListener.class));
+ context.addFilter(guiceFilter, "/*", EnumSet.allOf(DispatcherType.class));
+
+ server.setHandler(context);
+ server.start();
+
+ AlertsProcessingServer alertsServer = injector.getInstance(AlertsProcessingServer.class);
+
+ alertsServer.startProcessing();
+
+ server.join();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthToken.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthToken.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthToken.java
new file mode 100644
index 0000000..5d4fe68
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthToken.java
@@ -0,0 +1,187 @@
+package com.opensoc.dataservices.auth;
+
+import java.io.FileInputStream;
+import java.security.KeyStore;
+import java.util.Properties;
+
+import javax.crypto.Cipher;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AuthToken {
+
+ private static final Logger logger = LoggerFactory.getLogger( AuthToken.class );
+
+ public static String generateToken( final Properties configProps ) throws Exception
+ {
+
+ KeyStore ks = KeyStore.getInstance("JCEKS");
+ String keystoreFile = configProps.getProperty( "keystoreFile" );
+ logger.info( "keystoreFile: " + keystoreFile );
+
+ String keystorePassword = configProps.getProperty( "keystorePassword" );
+ logger.info( "keystorePassword: " + keystorePassword );
+
+ String keystoreAlias = configProps.getProperty( "authTokenAlias" );
+ logger.info( "keystoreAlias: " + keystoreAlias );
+
+ FileInputStream fis = null;
+ try {
+ fis = new FileInputStream( keystoreFile );
+ ks.load(fis, keystorePassword.toCharArray() );
+ }
+ catch( Exception e )
+ {
+ logger.error( "Error opening keyfile:", e );
+ throw e;
+ }
+ finally {
+ fis.close();
+ }
+
+ KeyStore.ProtectionParameter protParam =
+ new KeyStore.PasswordProtection(keystorePassword.toCharArray());
+ KeyStore.SecretKeyEntry secretKeyEntry = (KeyStore.SecretKeyEntry)ks.getEntry(keystoreAlias, protParam);
+
+ SecretKey key = secretKeyEntry.getSecretKey();
+
+
+ Cipher cipher = Cipher.getInstance("AES");
+ cipher.init(Cipher.ENCRYPT_MODE, key);
+ String tokenString = "OpenSOC_AuthToken:" + System.currentTimeMillis();
+
+ byte[] encryptedData = cipher.doFinal(tokenString.getBytes());
+
+ String base64Token = new String( Base64.encodeBase64(encryptedData) );
+
+ // System.out.println( "base64Token: " + base64Token );
+
+ return base64Token;
+
+ }
+
+ public static boolean validateToken( final Properties configProps, String authToken ) throws Exception
+ {
+ KeyStore ks = KeyStore.getInstance("JCEKS");
+ String keystoreFile = configProps.getProperty( "keystoreFile" );
+ String keystorePassword = configProps.getProperty( "keystorePassword" );
+ String keystoreAlias = configProps.getProperty( "authTokenAlias" );
+ long tokenMaxAgeInMilliseconds = Long.parseLong( configProps.getProperty( "authTokenMaxAge", "600000" ));
+
+ FileInputStream fis = null;
+ try {
+ fis = new FileInputStream( keystoreFile );
+ ks.load(fis, keystorePassword.toCharArray() );
+ }
+ finally {
+ if( fis != null) {
+ fis.close();
+ }
+ }
+
+ KeyStore.ProtectionParameter protParam =
+ new KeyStore.PasswordProtection(keystorePassword.toCharArray());
+ KeyStore.SecretKeyEntry secretKeyEntry = (KeyStore.SecretKeyEntry)ks.getEntry(keystoreAlias, protParam);
+
+ SecretKey key = secretKeyEntry.getSecretKey();
+
+ Cipher cipher = Cipher.getInstance("AES");
+ cipher.init(Cipher.DECRYPT_MODE, key);
+
+ byte[] encryptedBytes = Base64.decodeBase64(authToken);
+
+ byte[] unencryptedBytes = cipher.doFinal(encryptedBytes);
+ String clearTextToken = new String( unencryptedBytes );
+
+ System.out.println( "clearTextToken: " + clearTextToken );
+ String[] tokenParts = clearTextToken.split( ":" );
+
+ if( tokenParts[0].equals( "OpenSOC_AuthToken" ))
+ {
+ long now = System.currentTimeMillis();
+ long tokenTime = Long.parseLong(tokenParts[1]);
+
+ if( now > (tokenTime + tokenMaxAgeInMilliseconds ))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+ else
+ {
+ return false;
+ }
+
+ }
+
+ public static void main( String[] args ) throws Exception
+ {
+
+ Options options = new Options();
+
+ options.addOption( "keystoreFile", true, "Keystore File" );
+ options.addOption( "keystorePassword", true, "Keystore Password" );
+ options.addOption( "authTokenAlias", true, "");
+
+ CommandLineParser parser = new GnuParser();
+ CommandLine cmd = parser.parse( options, args);
+
+
+ try
+ {
+ KeyStore ks = KeyStore.getInstance("JCEKS");
+
+ String keystorePassword = cmd.getOptionValue("keystorePassword");
+ String keystoreFile = cmd.getOptionValue("keystoreFile");
+ String authTokenAlias = cmd.getOptionValue("authTokenAlias");
+
+ ks.load(null, keystorePassword.toCharArray());
+
+
+ // generate a key and store it in the keystore...
+ KeyGenerator keyGen = KeyGenerator.getInstance("AES");
+ SecretKey key = keyGen.generateKey();
+
+ KeyStore.ProtectionParameter protParam =
+ new KeyStore.PasswordProtection(keystorePassword.toCharArray());
+
+
+ KeyStore.SecretKeyEntry skEntry =
+ new KeyStore.SecretKeyEntry(key);
+
+ ks.setEntry(authTokenAlias, skEntry, protParam);
+
+ java.io.FileOutputStream fos = null;
+ try {
+
+ fos = new java.io.FileOutputStream(keystoreFile);
+ ks.store(fos, keystorePassword.toCharArray());
+ }
+ finally {
+
+ if (fos != null) {
+ fos.close();
+ }
+ }
+
+
+ System.out.println( "done" );
+
+ }
+ catch( Exception e )
+ {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthTokenFilter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthTokenFilter.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthTokenFilter.java
new file mode 100644
index 0000000..103a4cc
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/AuthTokenFilter.java
@@ -0,0 +1,15 @@
+package com.opensoc.dataservices.auth;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.ws.rs.NameBinding;
+
+@Target({ ElementType.TYPE, ElementType.METHOD })
+@Retention(value = RetentionPolicy.RUNTIME)
+@NameBinding
+public @interface AuthTokenFilter {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/CustomDomainADRealm.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/CustomDomainADRealm.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/CustomDomainADRealm.java
new file mode 100644
index 0000000..d7bffb2
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/CustomDomainADRealm.java
@@ -0,0 +1,34 @@
+package com.opensoc.dataservices.auth;
+
+import javax.naming.NamingException;
+
+import org.apache.shiro.authc.AuthenticationInfo;
+import org.apache.shiro.authc.AuthenticationToken;
+import org.apache.shiro.authc.UsernamePasswordToken;
+import org.apache.shiro.realm.activedirectory.ActiveDirectoryRealm;
+import org.apache.shiro.realm.ldap.LdapContextFactory;
+
+public class CustomDomainADRealm extends ActiveDirectoryRealm {
+
+ private String customDomain;
+
+ public void setCustomDomain(String customDomain) {
+ this.customDomain = customDomain;
+ }
+
+ public String getCustomDomain() {
+ return customDomain;
+ }
+
+ @Override
+ protected AuthenticationInfo queryForAuthenticationInfo(
+ AuthenticationToken token, LdapContextFactory ldapContextFactory)
+ throws NamingException {
+
+ UsernamePasswordToken upToken = (UsernamePasswordToken)token;
+ String userName = upToken.getUsername();
+ upToken.setUsername( userName + "@" + customDomain );
+
+ return super.queryForAuthenticationInfo(token, ldapContextFactory);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/RestSecurityInterceptor.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/RestSecurityInterceptor.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/RestSecurityInterceptor.java
new file mode 100644
index 0000000..03e2c48
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/auth/RestSecurityInterceptor.java
@@ -0,0 +1,57 @@
+package com.opensoc.dataservices.auth;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.core.Cookie;
+import javax.ws.rs.ext.Provider;
+
+import org.jboss.resteasy.core.Headers;
+import org.jboss.resteasy.core.ServerResponse;
+
+import com.google.inject.Inject;
+
+@AuthTokenFilter
+@Provider
+public class RestSecurityInterceptor implements javax.ws.rs.container.ContainerRequestFilter {
+
+ private static final ServerResponse ACCESS_DENIED = new ServerResponse("Access denied for this resource", 401, new Headers<Object>());;
+
+ @Inject
+ private Properties configProps;
+
+ @Override
+ public void filter(ContainerRequestContext requestContext) throws IOException {
+
+ // get our token...
+ Map<String, Cookie> cookies = requestContext.getCookies();
+
+ Cookie authTokenCookie = cookies.get( "authToken" );
+ if( authTokenCookie == null )
+ {
+ requestContext.abortWith(ACCESS_DENIED );
+ return;
+ }
+
+ String authToken = authTokenCookie.getValue();
+ try {
+
+ if( ! AuthToken.validateToken(configProps, authToken) )
+ {
+ requestContext.abortWith(ACCESS_DENIED );
+ return;
+ }
+ }
+ catch (Exception e) {
+
+ e.printStackTrace();
+ requestContext.abortWith(ACCESS_DENIED );
+ return;
+ }
+
+ // if the token is good, just return...
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/common/OpenSOCService.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/common/OpenSOCService.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/common/OpenSOCService.java
new file mode 100644
index 0000000..27b4cbf
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/common/OpenSOCService.java
@@ -0,0 +1,27 @@
+package com.opensoc.dataservices.common;
+
+public interface OpenSOCService {
+
+ //secure service that front elastic search or solr
+ //and the message broker
+
+ public String identify();
+ public boolean init(String topicname);
+ public boolean login();
+
+ //standing query operations
+ public boolean registerRulesFromFile();
+ public boolean registerRules();
+ public String viewRules();
+ public boolean editRules();
+ public boolean deleteRules();
+
+ //register for writing to kafka topic
+ public boolean registerForAlertsTopic(String topicname);
+
+ //client registers for alerts
+ public String receiveAlertAll();
+ public String receiveAlertLast();
+ public boolean disconnectFromAlertsTopic(String topicname);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaClient.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaClient.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaClient.java
new file mode 100644
index 0000000..7874f19
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaClient.java
@@ -0,0 +1,83 @@
+package com.opensoc.dataservices.kafkaclient;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.dataservices.Main;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+public class KafkaClient
+{
+ private static final Logger logger = LoggerFactory.getLogger( KafkaClient.class );
+
+ private final ConsumerConnector consumer;
+ private final String topic;
+ private ExecutorService executor;
+ private RemoteEndpoint remote;
+
+ public KafkaClient(String zooKeeper, String groupId, String topic, RemoteEndpoint remote)
+ {
+ this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
+ createConsumerConfig(zooKeeper, groupId ));
+
+ this.topic = topic;
+ this.remote = remote;
+ }
+
+ private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
+ Properties props = new Properties();
+ props.put("zookeeper.connect", a_zookeeper);
+ props.put("group.id", a_groupId);
+ props.put("zookeeper.session.timeout.ms", "1000");
+ props.put("zookeeper.sync.time.ms", "1000");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("auto.offset.reset", "smallest");
+
+ return new ConsumerConfig(props);
+ }
+
+ public void shutdown() {
+
+ logger.info( "Client shutdown() method invoked" );
+
+ if (consumer != null) {
+ consumer.shutdown();
+ }
+
+ if (executor != null) {
+ executor.shutdown();
+ }
+ }
+
+ public void run(int a_numThreads) {
+ Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+ topicCountMap.put(topic, new Integer(a_numThreads));
+ Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+ List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+
+ logger.debug( "streams.size = " + streams.size() );
+
+ // now launch all the threads
+ //
+ executor = Executors.newFixedThreadPool(a_numThreads);
+
+ // now create an object to consume the messages
+ //
+ int threadNumber = 0;
+ for (final KafkaStream stream : streams) {
+ executor.submit(new KafkaConsumer(this.remote, stream, threadNumber));
+ threadNumber++;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaConsumer.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaConsumer.java
new file mode 100644
index 0000000..0e01f1d
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/KafkaConsumer.java
@@ -0,0 +1,49 @@
+package com.opensoc.dataservices.kafkaclient;
+
+import java.io.IOException;
+
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.dataservices.Main;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+
+public class KafkaConsumer implements Runnable
+{
+ private static final Logger logger = LoggerFactory.getLogger( KafkaConsumer.class );
+
+ private KafkaStream m_stream;
+ private int m_threadNumber;
+ private RemoteEndpoint remote;
+
+ public KafkaConsumer( RemoteEndpoint remote, KafkaStream a_stream, int a_threadNumber)
+ {
+ this.m_threadNumber = a_threadNumber;
+ this.m_stream = a_stream;
+ this.remote = remote;
+ }
+
+ public void run()
+ {
+ logger.debug( "calling ConsumerTest.run()" );
+ ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
+
+ while (it.hasNext())
+ {
+ String message = new String(it.next().message());
+ try
+ {
+ remote.sendString( message );
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ logger.debug("Shutting down Thread: " + m_threadNumber);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaClient.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaClient.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaClient.java
new file mode 100644
index 0000000..a8a0305
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaClient.java
@@ -0,0 +1,101 @@
+package com.opensoc.dataservices.kafkaclient.poll;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PollingKafkaClient
+{
+ private static final Logger logger = LoggerFactory.getLogger( PollingKafkaClient.class );
+
+ private final ConsumerConnector consumer;
+ private final String topic;
+ private ExecutorService executor;
+
+ public PollingKafkaClient(String zooKeeper, String groupId, String topic)
+ {
+ this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
+ createConsumerConfig(zooKeeper, groupId ));
+
+ this.topic = topic;
+ }
+
+ private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
+ Properties props = new Properties();
+ props.put("zookeeper.connect", a_zookeeper);
+ props.put("group.id", a_groupId);
+ props.put( "zookeeper.session.timeout.ms", "1000");
+ props.put( "zookeeper.sync.time.ms", "200");
+ props.put( "auto.commit.interval.ms", "200");
+ props.put( "auto.offset.reset", "smallest");
+ // props.put( "fetch.min.bytes", "1" );
+ props.put( "consumer.timeout.ms", "1000" );
+
+ return new ConsumerConfig(props);
+ }
+
+ public void shutdown() {
+
+ logger.info( "Client shutdown() method invoked" );
+
+ if (consumer != null) {
+ consumer.shutdown();
+ }
+
+ if (executor != null) {
+ executor.shutdown();
+ }
+ }
+
+ public void run(int a_numThreads) {
+
+ }
+
+
+ public List fetchMessages()
+ {
+ List<String> messages = new ArrayList<String>();
+
+ Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+ topicCountMap.put(topic, new Integer(1));
+ Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+ List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+
+ logger.debug( "streams.size = " + streams.size() );
+
+ // now launch all the threads
+ //
+ executor = Executors.newFixedThreadPool(1);
+
+ // now create an object to consume the messages
+ //
+ int threadNumber = 0;
+ CountDownLatch latch = new CountDownLatch( streams.size() );
+
+ for (final KafkaStream stream : streams) {
+ executor.submit(new PollingKafkaConsumer(messages, stream, threadNumber, latch ));
+ threadNumber++;
+ }
+
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ // TODO: handle
+ }
+
+ return messages;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaConsumer.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaConsumer.java
new file mode 100644
index 0000000..ee59b1f
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/kafkaclient/poll/PollingKafkaConsumer.java
@@ -0,0 +1,52 @@
+package com.opensoc.dataservices.kafkaclient.poll;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PollingKafkaConsumer implements Runnable
+{
+ private static final Logger logger = LoggerFactory.getLogger( PollingKafkaConsumer.class );
+
+ private KafkaStream m_stream;
+ private int m_threadNumber;
+ private List<String> messages;
+ private CountDownLatch latch;
+
+ public PollingKafkaConsumer( List<String> messages, KafkaStream a_stream, int a_threadNumber, CountDownLatch latch )
+ {
+ this.m_threadNumber = a_threadNumber;
+ this.m_stream = a_stream;
+ this.messages = messages;
+ this.latch = latch;
+ }
+
+ public void run()
+ {
+ logger.warn( "calling PollingKafkaConsumer.run()" );
+ ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
+
+ try
+ {
+ while (it.hasNext())
+ {
+ String message = new String(it.next().message());
+ logger.warn( "adding message: " + message);
+ messages.add(message);
+ }
+ }
+ catch( Exception e)
+ {
+ logger.error( "Exception waiting on Kafka...", e );
+ }
+
+ latch.countDown();
+
+ logger.warn("Shutting down Thread: " + m_threadNumber);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/AlertsServerModule.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/AlertsServerModule.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/AlertsServerModule.java
new file mode 100644
index 0000000..bd741be
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/AlertsServerModule.java
@@ -0,0 +1,36 @@
+package com.opensoc.dataservices.modules.guice;
+
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.opensoc.alerts.server.AlertsCacheReaper;
+import com.opensoc.alerts.server.AlertsProcessingServer;
+import com.opensoc.alerts.server.AlertsSearcher;
+
+public class AlertsServerModule extends AbstractModule {
+
+ private static final Logger logger = LoggerFactory.getLogger( AlertsServerModule.class );
+
+ private Properties configProps;
+
+ public AlertsServerModule( final Properties configProps ) {
+ this.configProps = configProps;
+ }
+
+ @Override
+ protected void configure() {
+ bind( AlertsProcessingServer.class).in(Singleton.class);
+ bind( AlertsSearcher.class).in(Singleton.class);
+ bind( AlertsCacheReaper.class ).in(Singleton.class );
+ }
+
+ @Provides Properties getConfigProps()
+ {
+ return configProps;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultServletModule.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultServletModule.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultServletModule.java
new file mode 100644
index 0000000..3e6d3b5
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultServletModule.java
@@ -0,0 +1,48 @@
+package com.opensoc.dataservices.modules.guice;
+
+import java.util.Properties;
+
+import org.apache.shiro.guice.web.ShiroWebModule;
+import org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Singleton;
+import com.google.inject.servlet.ServletModule;
+import com.opensoc.dataservices.kafkaclient.KafkaConsumer;
+import com.opensoc.dataservices.servlet.LoginServlet;
+import com.opensoc.dataservices.servlet.LogoutServlet;
+import com.opensoc.dataservices.websocket.KafkaMessageSenderServlet;
+import com.opensoc.dataservices.websocket.KafkaWebSocketCreator;
+
+public class DefaultServletModule extends ServletModule {
+
+ private static final Logger logger = LoggerFactory.getLogger( DefaultServletModule.class );
+
+ private Properties configProps;
+
+ public DefaultServletModule( final Properties configProps ) {
+ this.configProps = configProps;
+ }
+
+ @Override
+ protected void configureServlets() {
+
+ ShiroWebModule.bindGuiceFilter(binder());
+
+ bind( KafkaWebSocketCreator.class ).in(Singleton.class);
+
+ bind( HttpServletDispatcher.class ).in(Singleton.class);
+ serve( "/rest/*").with(HttpServletDispatcher.class);
+
+ bind( KafkaMessageSenderServlet.class ).in(Singleton.class);
+ serve( "/ws/*").with(KafkaMessageSenderServlet.class );
+
+ bind( LoginServlet.class).in(Singleton.class);
+ serve( "/login" ).with( LoginServlet.class );
+
+ bind( LogoutServlet.class).in(Singleton.class);
+ serve( "/logout" ).with( LogoutServlet.class );
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultShiroWebModule.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultShiroWebModule.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultShiroWebModule.java
new file mode 100644
index 0000000..bce4fce
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/DefaultShiroWebModule.java
@@ -0,0 +1,90 @@
+package com.opensoc.dataservices.modules.guice;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.servlet.ServletContext;
+
+import org.apache.shiro.guice.web.ShiroWebModule;
+import org.apache.shiro.web.filter.authc.LogoutFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Provides;
+import com.google.inject.name.Names;
+import com.opensoc.dataservices.auth.CustomDomainADRealm;
+
+public class DefaultShiroWebModule extends ShiroWebModule {
+
+ private static final Logger logger = LoggerFactory.getLogger( DefaultShiroWebModule.class );
+
+ private Properties configProps;
+
+ public DefaultShiroWebModule(final ServletContext sc) {
+ super(sc);
+ }
+
+ public DefaultShiroWebModule(final Properties configProps, final ServletContext sc) {
+ super(sc);
+ this.configProps = configProps;
+ }
+
+ protected void configureShiroWeb() {
+ bindConstant().annotatedWith(Names.named("shiro.loginUrl")).to( "/login.jsp" );
+ bindRealm().to(CustomDomainADRealm.class);
+ bind( LogoutFilter.class);
+
+ addFilterChain("/login", ANON);
+ addFilterChain("/logout", ANON);
+ addFilterChain("/withsocket.jsp", AUTHC );
+ addFilterChain("/withsocket2.jsp", ANON );
+ }
+
+ @Provides
+ @javax.inject.Singleton
+ CustomDomainADRealm providesRealm()
+ {
+
+ CustomDomainADRealm realm = new CustomDomainADRealm();
+
+ String ldapUrl = configProps.getProperty("ldapUrl");
+ logger.info( "got ldapurl from config: " + ldapUrl );
+ realm.setUrl(ldapUrl);
+
+ // String ldapAuthMechanism = configProps.getProperty( "ldapAuthMechanism", "simple" ).trim();
+ // logger.info( "got ldapAuthMechanism from config: " + ldapAuthMechanism );
+
+
+ String activeDirectorySystemUsername = configProps.getProperty( "activeDirectorySystemUsername" ).trim();
+ logger.info( "got activeDirectorySystemUsername from config: " + activeDirectorySystemUsername );
+ realm.setSystemUsername(activeDirectorySystemUsername);
+
+ String activeDirectorySystemPassword = configProps.getProperty( "activeDirectorySystemPassword" ).trim();
+ logger.info( "got activeDirectorySystemPassword from config: " + activeDirectorySystemPassword );
+ realm.setSystemPassword(activeDirectorySystemPassword);
+
+ String adDomain = configProps.getProperty( "adDomain" ).trim();
+ realm.setCustomDomain( adDomain );
+
+ String activeDirectoryBaseSearchDN = configProps.getProperty( "activeDirectoryBaseSearchDN" ).trim();
+ logger.info( "got activeDirectoryBaseSearchDN from config: " + activeDirectoryBaseSearchDN );
+ realm.setSearchBase( activeDirectoryBaseSearchDN );
+
+ String groupRolesMapStr = configProps.getProperty( "groupRolesMap" );
+ logger.info( "got groupRolesMapStr from config: " + groupRolesMapStr );
+
+ String[] mappings = groupRolesMapStr.split( "\\|" );
+
+ Map<String,String> groupRolesMap = new HashMap<String, String>();
+ for( String mapping : mappings )
+ {
+ System.out.println( "mapping: " + mapping );
+ String[] mappingParts = mapping.split(":");
+ groupRolesMap.put( mappingParts[0], mappingParts[1]);
+ }
+
+ realm.setGroupRolesMap(groupRolesMap);
+ return realm;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/RestEasyModule.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/RestEasyModule.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/RestEasyModule.java
new file mode 100644
index 0000000..14dfdb8
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/RestEasyModule.java
@@ -0,0 +1,23 @@
+package com.opensoc.dataservices.modules.guice;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.opensoc.dataservices.auth.RestSecurityInterceptor;
+import com.opensoc.dataservices.kafkaclient.KafkaConsumer;
+import com.opensoc.dataservices.rest.Index;
+
+public class RestEasyModule extends AbstractModule {
+
+ private static final Logger logger = LoggerFactory.getLogger( RestEasyModule.class );
+
+ @Override
+ protected void configure() {
+
+ bind( Index.class );
+ bind( RestSecurityInterceptor.class );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/ServiceModule.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/ServiceModule.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/ServiceModule.java
new file mode 100644
index 0000000..5271674
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/modules/guice/ServiceModule.java
@@ -0,0 +1,34 @@
+package com.opensoc.dataservices.modules.guice;
+
+import javax.inject.Singleton;
+
+import org.jboss.resteasy.plugins.guice.ext.RequestScopeModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Provides;
+import com.opensoc.dataservices.common.OpenSOCService;
+import com.opensoc.dataservices.kafkaclient.KafkaConsumer;
+import com.opensoc.services.alerts.ElasticSearch_KafkaAlertsService;
+import com.opensoc.services.alerts.Solr_KafkaAlertsService;
+
+public class ServiceModule extends RequestScopeModule {
+
+ private static final Logger logger = LoggerFactory.getLogger( ServiceModule.class );
+
+ private String[] args;
+
+ public ServiceModule(String[] args) {
+ this.args = args;
+ }
+
+ @Provides
+ @Singleton
+ public OpenSOCService socservice() {
+ if (args.length > 0 && args[0].equals("ElasticSearch_KafkaAlertsService")) {
+ return new ElasticSearch_KafkaAlertsService();
+ } else {
+ return new Solr_KafkaAlertsService();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/Index.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/Index.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/Index.java
new file mode 100644
index 0000000..d9916dc
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/Index.java
@@ -0,0 +1,53 @@
+package com.opensoc.dataservices.rest;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Response;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.opensoc.dataservices.auth.AuthTokenFilter;
+import com.opensoc.dataservices.kafkaclient.poll.PollingKafkaClient;
+
+@Path("/")
+public class Index
+{
+ private static final Logger logger = LoggerFactory.getLogger( Index.class );
+
+ @Inject
+ private Properties configProps;
+
+ @AuthTokenFilter
+ @GET
+ @Path("/alerts/{groupId}")
+ public Response getAlerts( @PathParam("groupId") String groupId )
+ {
+ String zooKeeperHost = configProps.getProperty( "kafkaZookeeperHost" );
+ logger.info( "kafkaZookeeperHost: " + zooKeeperHost );
+ String zooKeeperPort = configProps.getProperty( "kafkaZookeeperPort" );
+ logger.info( "kafkaZookeeperPort: " + zooKeeperPort );
+
+ logger.warn( "got groupId from path as: " + groupId );
+
+ PollingKafkaClient client = new PollingKafkaClient( zooKeeperHost + ":" + zooKeeperPort, groupId, "test");
+ List<String> messages = client.fetchMessages();
+ logger.warn( "found " + messages.size() + " messages in Kafka" );
+
+ String respString1 = "<html><body><h2>Messages:</h2><ul>";
+ String respString2 = "</ul></body></html>";
+
+ for( String msg : messages )
+ {
+ respString1 = respString1 + "<li>" + msg + "</li>";
+ }
+
+ return Response.status(200).entity( respString1 + respString2 ).build();
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/RestServices.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/RestServices.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/RestServices.java
new file mode 100644
index 0000000..650b6d4
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/rest/RestServices.java
@@ -0,0 +1,35 @@
+package com.opensoc.dataservices.rest;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.ws.rs.core.Application;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.dataservices.kafkaclient.KafkaConsumer;
+
+public class RestServices extends Application
+{
+ private static final Logger logger = LoggerFactory.getLogger( RestServices.class );
+
+ private static Set services = new HashSet();
+
+ public RestServices()
+ {
+ // initialize restful services
+ services.add(new Index());
+ }
+
+ @Override
+ public Set getSingletons()
+ {
+ return services;
+ }
+
+ public static Set getServices()
+ {
+ return services;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/servlet/LoginServlet.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/servlet/LoginServlet.java b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/servlet/LoginServlet.java
new file mode 100644
index 0000000..56d2c3e
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataServices/src/main/java/com/opensoc/dataservices/servlet/LoginServlet.java
@@ -0,0 +1,113 @@
+package com.opensoc.dataservices.servlet;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.shiro.SecurityUtils;
+import org.apache.shiro.authc.AuthenticationException;
+import org.apache.shiro.authc.ExcessiveAttemptsException;
+import org.apache.shiro.authc.IncorrectCredentialsException;
+import org.apache.shiro.authc.LockedAccountException;
+import org.apache.shiro.authc.UnknownAccountException;
+import org.apache.shiro.authc.UsernamePasswordToken;
+import org.apache.shiro.subject.Subject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.opensoc.dataservices.auth.AuthToken;
+
+public class LoginServlet extends HttpServlet
+{
+ private static final Logger logger = LoggerFactory.getLogger( LoginServlet.class );
+
+ private static final long serialVersionUID = 1L;
+
+ @Inject
+ private Properties configProps;
+
+ @Override
+ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
+ {
+ doPost( req, resp );
+ }
+
+ @Override
+ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
+ {
+ String username = req.getParameter("username" );
+ String password = req.getParameter("password" );
+ UsernamePasswordToken token = new UsernamePasswordToken(username, password);
+
+ logger.info( "Doing login for user: " + username );
+
+ Subject currentUser = SecurityUtils.getSubject();
+
+ try
+ {
+ currentUser.login(token);
+ }
+ catch ( UnknownAccountException uae )
+ {
+ logger.warn( "Failing login with 401:", uae );
+ resp.sendError(405);
+ return;
+ }
+ catch ( IncorrectCredentialsException ice )
+ {
+ logger.warn( "Failing login with 401:", ice );
+ resp.sendError(405);
+ return;
+ }
+ catch ( LockedAccountException lae )
+ {
+ logger.warn( "Failing login with 401:", lae );
+ resp.sendError(401);
+ return;
+ }
+ catch ( ExcessiveAttemptsException eae )
+ {
+ logger.warn( "Failing login with 401:", eae );
+ resp.sendError(401);
+ return;
+ }
+ catch ( AuthenticationException ae )
+ {
+ logger.warn( "Failing login with 401:", ae );
+ resp.sendError(401);
+ return;
+ }
+
+
+ if( currentUser.hasRole("ShiroUsersRole") )
+ {
+ try
+ {
+
+ Cookie authTokenCookie = new Cookie("authToken", AuthToken.generateToken(configProps));
+ resp.addCookie(authTokenCookie);
+
+ // resp.setStatus(HttpServletResponse.SC_OK);
+ resp.sendRedirect( "/withsocket.jsp" );
+ }
+ catch( Exception e )
+ {
+ logger.error( "Failed creating authToken cookie.", e );
+ resp.sendError( 500 );
+ return;
+ }
+ }
+ else
+ {
+ logger.error("User does not have required role!");
+ resp.sendError(401);
+ return;
+ }
+ }
+}
\ No newline at end of file