You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/02/16 18:10:20 UTC
[4/5] incubator-metron git commit: METRON-35 Implement threat
intelligence message enrichment closes apache/incubator-metron#22
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java
new file mode 100644
index 0000000..b13e780
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java
@@ -0,0 +1,75 @@
+package org.apache.metron.threatintel;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.reference.lookup.LookupKey;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public class ThreatIntelKey implements LookupKey{
+ private static final int SEED = 0xDEADBEEF;
+ private static final int HASH_PREFIX_SIZE=16;
+ ThreadLocal<HashFunction> hFunction= new ThreadLocal<HashFunction>() {
+ @Override
+ protected HashFunction initialValue() {
+ return Hashing.murmur3_128(SEED);
+ }
+ };
+ public ThreatIntelKey() {
+
+ }
+ public ThreatIntelKey(String indicator) {
+ this.indicator = indicator;
+ }
+
+ public String indicator;
+
+ @Override
+ public byte[] toBytes() {
+ byte[] indicatorBytes = Bytes.toBytes(indicator);
+ Hasher hasher = hFunction.get().newHasher();
+ hasher.putBytes(Bytes.toBytes(indicator));
+ byte[] prefix = hasher.hash().asBytes();
+ byte[] val = new byte[indicatorBytes.length + prefix.length];
+ int pos = 0;
+ for(int i = 0;pos < prefix.length;++pos,++i) {
+ val[pos] = prefix[i];
+ }
+ for(int i = 0;i < indicatorBytes.length;++pos,++i) {
+ val[pos] = indicatorBytes[i];
+ }
+ return val;
+ }
+
+ public static ThreatIntelKey fromBytes(byte[] row) {
+ ThreatIntelKey key = new ThreatIntelKey();
+ key.indicator = Bytes.toString(row, HASH_PREFIX_SIZE, row.length - HASH_PREFIX_SIZE);
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ThreatIntelKey that = (ThreatIntelKey) o;
+
+ return indicator != null ? indicator.equals(that.indicator) : that.indicator == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return indicator != null ? indicator.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "ThreatIntelKey{" +
+ "indicator='" + indicator + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
new file mode 100644
index 0000000..bfe20b2
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
@@ -0,0 +1,55 @@
+package org.apache.metron.threatintel;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/3/16.
+ */
+public class ThreatIntelResults {
+ private ThreatIntelKey key;
+ private Map<String, String> value;
+ public ThreatIntelResults() {
+ key = new ThreatIntelKey();
+ value = new HashMap<>();
+ }
+ public ThreatIntelResults(ThreatIntelKey key, Map<String, String> value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public ThreatIntelKey getKey() {
+ return key;
+ }
+
+ public Map<String, String> getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ThreatIntelResults that = (ThreatIntelResults) o;
+
+ if (getKey() != null ? !getKey().equals(that.getKey()) : that.getKey() != null) return false;
+ return getValue() != null ? getValue().equals(that.getValue()) : that.getValue() == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getKey() != null ? getKey().hashCode() : 0;
+ result = 31 * result + (getValue() != null ? getValue().hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "ThreatIntelResults{" +
+ "key=" + key +
+ ", value=" + value +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java
new file mode 100644
index 0000000..a3d94e9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java
@@ -0,0 +1,82 @@
+package org.apache.metron.threatintel.hbase;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public enum Converter {
+ INSTANCE;
+ public static final String VALUE_COLUMN_NAME = "v";
+ public static final byte[] VALUE_COLUMN_NAME_B = Bytes.toBytes(VALUE_COLUMN_NAME);
+ public static final String LAST_SEEN_COLUMN_NAME = "t";
+ public static final byte[] LAST_SEEN_COLUMN_NAME_B = Bytes.toBytes(LAST_SEEN_COLUMN_NAME);
+ private static final ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
+ @Override
+ protected ObjectMapper initialValue() {
+ return new ObjectMapper();
+ }
+ };
+ public Put toPut(String columnFamily, ThreatIntelKey key, Map<String, String> value, Long lastSeenTimestamp) throws IOException {
+ Put put = new Put(key.toBytes());
+ byte[] cf = Bytes.toBytes(columnFamily);
+ put.add(cf,VALUE_COLUMN_NAME_B, Bytes.toBytes(valueToString(value)));
+ put.add(cf, LAST_SEEN_COLUMN_NAME_B, Bytes.toBytes(lastSeenTimestamp));
+ return put;
+ }
+
+ public Map.Entry<ThreatIntelResults, Long> fromPut(Put put, String columnFamily) throws IOException {
+ ThreatIntelKey key = ThreatIntelKey.fromBytes(put.getRow());
+ Map<String, String> value = null;
+ Long lastSeen = null;
+ byte[] cf = Bytes.toBytes(columnFamily);
+ List<Cell> cells = put.getFamilyCellMap().get(cf);
+ for(Cell cell : cells) {
+ if(Bytes.equals(cell.getQualifier(), VALUE_COLUMN_NAME_B)) {
+ value = stringToValue(Bytes.toString(cell.getValue()));
+ }
+ else if(Bytes.equals(cell.getQualifier(), LAST_SEEN_COLUMN_NAME_B)) {
+ lastSeen = Bytes.toLong(cell.getValue());
+ }
+ }
+ return new AbstractMap.SimpleEntry<>(new ThreatIntelResults(key, value), lastSeen);
+ }
+
+ public Result toResult(String columnFamily, ThreatIntelKey key, Map<String, String> value, Long lastSeenTimestamp) throws IOException {
+ Put put = toPut(columnFamily, key, value, lastSeenTimestamp);
+ return Result.create(put.getFamilyCellMap().get(Bytes.toBytes(columnFamily)));
+ }
+
+ public Map.Entry<ThreatIntelResults, Long> fromResult(Result result, String columnFamily) throws IOException {
+ ThreatIntelKey key = ThreatIntelKey.fromBytes(result.getRow());
+ byte[] cf = Bytes.toBytes(columnFamily);
+ NavigableMap<byte[], byte[]> cols = result.getFamilyMap(cf);
+ Map<String, String> value = stringToValue(Bytes.toString(cols.get(VALUE_COLUMN_NAME_B)));
+ ThreatIntelResults results = new ThreatIntelResults(key, value);
+ return new AbstractMap.SimpleEntry<>(results, Bytes.toLong(cols.get(LAST_SEEN_COLUMN_NAME_B)));
+ }
+
+ public Get toGet(String columnFamily, ThreatIntelKey key) {
+ Get ret = new Get(key.toBytes());
+ ret.addFamily(Bytes.toBytes(columnFamily));
+ return ret;
+ }
+
+ public Map<String, String> stringToValue(String s) throws IOException {
+ return _mapper.get().readValue(s, new TypeReference<Map<String, String>>(){});
+ }
+ public String valueToString(Map<String, String> value) throws IOException {
+ return _mapper.get().writeValueAsString(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
new file mode 100644
index 0000000..db4d2fd
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
@@ -0,0 +1,60 @@
+package org.apache.metron.threatintel.hbase;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.reference.lookup.Lookup;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.accesstracker.AccessTracker;
+import org.apache.metron.reference.lookup.handler.Handler;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class ThreatIntelLookup extends Lookup<HTableInterface, ThreatIntelKey, Map.Entry<ThreatIntelResults, Long>> implements AutoCloseable {
+
+
+
+ public static class Handler implements org.apache.metron.reference.lookup.handler.Handler<HTableInterface, ThreatIntelKey, Map.Entry<ThreatIntelResults, Long>> {
+ String columnFamily;
+ public Handler(String columnFamily) {
+ this.columnFamily = columnFamily;
+ }
+ @Override
+ public boolean exists(ThreatIntelKey key, HTableInterface table, boolean logAccess) throws IOException {
+ return table.exists(Converter.INSTANCE.toGet(columnFamily, key));
+ }
+
+ @Override
+ public Map.Entry<ThreatIntelResults,Long> get(ThreatIntelKey key, HTableInterface table, boolean logAccess) throws IOException {
+ return Converter.INSTANCE.fromResult(table.get(Converter.INSTANCE.toGet(columnFamily, key)), columnFamily);
+ }
+
+
+ @Override
+ public void close() throws Exception {
+
+ }
+ }
+ private HTableInterface table;
+ public ThreatIntelLookup(HTableInterface table, String columnFamily, AccessTracker tracker) {
+ this.table = table;
+ this.setLookupHandler(new Handler(columnFamily));
+ this.setAccessTracker(tracker);
+ }
+
+ public HTableInterface getTable() {
+ return table;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ table.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/pom.xml b/metron-streaming/Metron-DataLoads/pom.xml
index 1e0ea0f..c2a76fa 100644
--- a/metron-streaming/Metron-DataLoads/pom.xml
+++ b/metron-streaming/Metron-DataLoads/pom.xml
@@ -1,106 +1,241 @@
<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software
- Foundation (ASF) under one or more contributor license agreements. See the
- NOTICE file distributed with this work for additional information regarding
- copyright ownership. The ASF licenses this file to You under the Apache License,
- Version 2.0 (the "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software distributed
- under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
- OR CONDITIONS OF ANY KIND, either express or implied. See the License for
- the specific language governing permissions and limitations under the License. -->
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.metron</groupId>
- <artifactId>Metron-Streaming</artifactId>
- <version>0.6BETA</version>
- </parent>
- <artifactId>Metron-DataLoads</artifactId>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.metron</groupId>
- <artifactId>Metron-Common</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${global_storm_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${global_hbase_version}</version>
- <exclusions>
- <exclusion>
- <artifactId>log4j</artifactId>
- <groupId>log4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- <build>
- <sourceDirectory>src</sourceDirectory>
- <resources>
- <resource>
- <directory>src</directory>
- <excludes>
- <exclude>**/*.java</exclude>
- </excludes>
- </resource>
- </resources>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.3</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <minimizeJar>true</minimizeJar>
- <artifactSet>
- <excludes>
- <exclude>classworlds:classworlds</exclude>
- <exclude>junit:junit</exclude>
- <exclude>jmock:*</exclude>
- <exclude>*:xml-apis</exclude>
- <exclude>*slf4j*</exclude>
- <exclude>org.apache.maven:lib:tests</exclude>
- <exclude>log4j:log4j:jar:</exclude>
- <exclude>*:hbase:*</exclude>
- <exclude>org.apache.hadoop.yarn.util.package-info*</exclude>
- </excludes>
- </artifactSet>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Streaming</artifactId>
+ <version>0.6BETA</version>
+ </parent>
+ <artifactId>Metron-DataLoads</artifactId>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_hbase_guava_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mitre</groupId>
+ <artifactId>stix</artifactId>
+ <version>1.2.0.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Common</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.googlecode.disruptor</groupId>
+ <artifactId>disruptor</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!--dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.googlecode.disruptor</groupId>
+ <artifactId>disruptor</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency-->
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ <version>${global_opencsv_version}</version>
+ </dependency>
-</project>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!--dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_guava_version}</version>
+ </dependency-->
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${global_slf4j_version}</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>src</directory>
+ <excludes>
+ <exclude>**/*.java</exclude>
+ </excludes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <!-- Separates the unit tests from the integration tests. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.12.4</version>
+ <configuration>
+ <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
+ <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
+ <skip>true</skip>
+ <!-- Show 100% of the lines from the stack trace (doesn't work) -->
+ <trimStackTrace>false</trimStackTrace>
+
+ </configuration>
+ <executions>
+ <execution>
+ <id>unit-tests</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <!-- Never skip running the tests when the test phase is invoked -->
+ <skip>false</skip>
+ <includes>
+ <!-- Include unit tests within integration-test phase. -->
+ <include>**/*Test.java</include>
+ </includes>
+ <excludes>
+ <!-- Exclude integration tests within (unit) test phase. -->
+ <exclude>**/*IntegrationTest.java</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ <execution>
+ <id>integration-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <!-- Never skip running the tests when the integration-test phase is invoked -->
+ <skip>false</skip>
+ <includes>
+ <!-- Include integration tests within integration-test phase. -->
+ <include>**/*IntegrationTest.java</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.metron.guava.dataload</shadedPattern>
+ </relocation>
+ </relocations>
+ <minimizeJar>true</minimizeJar>
+ <artifactSet>
+ <excludes>
+ <exclude>classworlds:classworlds</exclude>
+ <exclude>junit:junit</exclude>
+ <exclude>jmock:*</exclude>
+ <exclude>*:xml-apis</exclude>
+ <exclude>*slf4j*</exclude>
+ <exclude>org.apache.maven:lib:tests</exclude>
+ <exclude>log4j:log4j:jar:</exclude>
+ <exclude>*:hbase:*</exclude>
+ <exclude>org.apache.hadoop.yarn.util.package-info*</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptor>src/main/assembly/assembly.xml</descriptor>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ </project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/assembly/assembly.xml b/metron-streaming/Metron-DataLoads/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..44d7216
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/assembly/assembly.xml
@@ -0,0 +1,28 @@
+<assembly>
+ <id>archive</id>
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.basedir}/src/main/bash</directory>
+ <outputDirectory>/bin</outputDirectory>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ <excludes>
+ <exclude>**/*.formatted</exclude>
+ <exclude>**/*.filtered</exclude>
+ </excludes>
+ <fileMode>0755</fileMode>
+ <lineEnding>unix</lineEnding>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/target</directory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ <outputDirectory>/lib</outputDirectory>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ </fileSet>
+ </fileSets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
new file mode 100755
index 0000000..430c48c
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+ . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+ . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+ if [ -f $jar ];then
+ LIBJARS="$jar,$LIBJARS"
+ fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.6BETA/lib/Metron-DataLoads-0.6BETA.jar org.apache.metron.dataloads.ThreatIntelBulkLoader -libjars ${LIBJARS} "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
new file mode 100755
index 0000000..e21c9e1
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+ . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+ . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+ if [ -f $jar ];then
+ LIBJARS="$jar,$LIBJARS"
+ fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.6BETA/lib/Metron-DataLoads-0.6BETA.jar org.apache.metron.dataloads.LeastRecentlyUsedPruner -libjars ${LIBJARS} "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java
new file mode 100644
index 0000000..899e13d
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java
@@ -0,0 +1,207 @@
+package org.apache.metron.dataloads;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.hbase.mr.PrunerMapper;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class LeastRecentlyUsedPruner {
+ private static abstract class OptionHandler implements Function<String, Option> {}
+ private enum BulkLoadOptions {
+ HELP("h", new OptionHandler() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ return new Option(s, "help", false, "Generate Help screen");
+ }
+ }), TABLE("t", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "table", true, "HBase table to prune");
+ o.setRequired(true);
+ o.setArgName("HBASE_TABLE");
+ return o;
+ }
+ }), COLUMN_FAMILY("f", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "column_family", true, "Column family of the HBase table to prune");
+ o.setRequired(false);
+ o.setArgName("CF_NAME");
+ return o;
+ }
+ })
+ ,AS_OF_TIME("a", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "as_of", true, "The earliest access tracker you want to use.");
+ o.setArgName("datetime");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ String defaultFormat = new SimpleDateFormat().toLocalizedPattern();
+ Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option) (Default is: " + defaultFormat + ")");
+ o.setArgName("format");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,ACCESS_TABLE("u", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "access_table", true, "HBase table containing the access trackers.");
+ o.setRequired(true);
+ o.setArgName("HBASE_TABLE");
+ return o;
+ }
+ }), ACCESS_COLUMN_FAMILY("z", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "access_column_family", true, "Column family of the HBase table containing the access trackers");
+ o.setRequired(true);
+ o.setArgName("CF_NAME");
+ return o;
+ }
+ });
+ Option option;
+ String shortCode;
+ BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+ this.shortCode = shortCode;
+ this.option = optionHandler.apply(shortCode);
+ }
+
+ public boolean has(CommandLine cli) {
+ return cli.hasOption(shortCode);
+ }
+
+ public String get(CommandLine cli) {
+ return cli.getOptionValue(shortCode);
+ }
+ private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+ Date d = getFormat(cli).parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+ return d.getTime();
+ }
+
+ private static DateFormat getFormat(CommandLine cli) {
+ DateFormat format = new SimpleDateFormat();
+ if (BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+ format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+ }
+ return format;
+ }
+
+ public static CommandLine parse(CommandLineParser parser, String[] args) {
+ try {
+ CommandLine cli = parser.parse(getOptions(), args);
+ if(BulkLoadOptions.HELP.has(cli)) {
+ printHelp();
+ System.exit(0);
+ }
+ return cli;
+ } catch (ParseException e) {
+ System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+ e.printStackTrace(System.err);
+ printHelp();
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ public static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
+ }
+
+ public static Options getOptions() {
+ Options ret = new Options();
+ for(BulkLoadOptions o : BulkLoadOptions.values()) {
+ ret.addOption(o.option);
+ }
+ return ret;
+ }
+ }
+
+ public static void setupHBaseJob(Job job, String sourceTable, String cf) throws IOException {
+ Scan scan = new Scan();
+ if(cf != null) {
+ scan.addFamily(Bytes.toBytes(cf));
+ }
+ scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+ scan.setCacheBlocks(false); // don't set to true for MR jobs
+// set other scan attrs
+
+ TableMapReduceUtil.initTableMapperJob(
+ sourceTable, // input table
+ scan, // Scan instance to control CF and attribute selection
+ PrunerMapper.class, // mapper class
+ null, // mapper output key
+ null, // mapper output value
+ job);
+ TableMapReduceUtil.initTableReducerJob(
+ sourceTable, // output table
+ null, // reducer class
+ job);
+ }
+
+ public static Job createJob( Configuration conf
+ , String table
+ , String cf
+ , String accessTrackerTable
+ , String accessTrackerColumnFamily
+ , Long ts
+ ) throws IOException
+ {
+ Job job = new Job(conf);
+ job.setJobName("LeastRecentlyUsedPruner: Pruning " + table + ":" + cf + " since " + new SimpleDateFormat().format(new Date(ts)));
+ System.out.println("Configuring " + job.getJobName());
+ job.setJarByClass(LeastRecentlyUsedPruner.class);
+ job.getConfiguration().setLong(PrunerMapper.TIMESTAMP_CONF, ts);
+ job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_NAME_CONF, table);
+ job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_CF_CONF, accessTrackerColumnFamily);
+ job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_TABLE_CONF, accessTrackerTable);
+ setupHBaseJob(job, table, cf);
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+ Configuration conf = HBaseConfiguration.create();
+ String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+ CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+ Long ts = BulkLoadOptions.getTimestamp(cli);
+ String table = BulkLoadOptions.TABLE.get(cli);
+ String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+ String accessTrackerTable = BulkLoadOptions.ACCESS_TABLE.get(cli);
+ String accessTrackerCF = BulkLoadOptions.ACCESS_COLUMN_FAMILY.get(cli);
+ Job job = createJob(conf, table, cf, accessTrackerTable, accessTrackerCF, ts);
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java
new file mode 100644
index 0000000..d83b35a
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java
@@ -0,0 +1,199 @@
+package org.apache.metron.dataloads;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.io.Files;
+import org.apache.commons.cli.*;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.text.*;
+import java.util.Date;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public class ThreatIntelBulkLoader {
+ private static abstract class OptionHandler implements Function<String, Option> {}
+ private enum BulkLoadOptions {
+ HELP("h", new OptionHandler() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ return new Option(s, "help", false, "Generate Help screen");
+ }
+ })
+ ,TABLE("t", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "table", true, "HBase table to import data into");
+ o.setRequired(true);
+ o.setArgName("HBASE_TABLE");
+ return o;
+ }
+ })
+ ,COLUMN_FAMILY("f", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "column_family", true, "Column family of the HBase table to import into");
+ o.setRequired(true);
+ o.setArgName("CF_NAME");
+ return o;
+ }
+ })
+ ,EXTRACTOR_CONFIG("e", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+ o.setArgName("JSON_FILE");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,INPUT_DATA("i", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "input", true, "Input directory in HDFS for the data to import into HBase");
+ o.setArgName("DIR");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,AS_OF_TIME("a", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "as_of", true, "The last read timestamp to mark the records with (omit for time of execution)");
+ o.setArgName("datetime");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option)");
+ o.setArgName("format");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ;
+ Option option;
+ String shortCode;
+ BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+ this.shortCode = shortCode;
+ this.option = optionHandler.apply(shortCode);
+ }
+
+ public boolean has(CommandLine cli) {
+ return cli.hasOption(shortCode);
+ }
+
+ public String get(CommandLine cli) {
+ return cli.getOptionValue(shortCode);
+ }
+
+ public static CommandLine parse(CommandLineParser parser, String[] args) {
+ try {
+ CommandLine cli = parser.parse(getOptions(), args);
+ if(ThreatIntelBulkLoader.BulkLoadOptions.HELP.has(cli)) {
+ printHelp();
+ System.exit(0);
+ }
+ return cli;
+ } catch (ParseException e) {
+ System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+ e.printStackTrace(System.err);
+ printHelp();
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ public static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
+ }
+
+ public static Options getOptions() {
+ Options ret = new Options();
+ for(BulkLoadOptions o : BulkLoadOptions.values()) {
+ ret.addOption(o.option);
+ }
+ return ret;
+ }
+ }
+ private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+ if(BulkLoadOptions.AS_OF_TIME.has(cli)) {
+ if(!BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+ throw new IllegalStateException("Unable to proceed: Specified as_of_time without an associated format.");
+ }
+ else {
+ DateFormat format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+ Date d = format.parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+ return d.getTime();
+ }
+ }
+ else {
+ return System.currentTimeMillis();
+ }
+ }
+ private static String readExtractorConfig(File configFile) throws IOException {
+ return Joiner.on("\n").join(Files.readLines(configFile, Charset.defaultCharset()));
+ }
+
+ public static Job createJob(Configuration conf, String input, String table, String cf, String extractorConfigContents, long ts) throws IOException {
+ Job job = new Job(conf);
+ job.setJobName("ThreatIntelBulkLoader: " + input + " => " + table + ":" + cf);
+ System.out.println("Configuring " + job.getJobName());
+ job.setJarByClass(ThreatIntelBulkLoader.class);
+ job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
+ job.setOutputFormatClass(TableOutputFormat.class);
+ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
+ job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
+ job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
+ job.getConfiguration().set(BulkLoadMapper.LAST_SEEN_KEY, "" + ts);
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Put.class);
+ job.setNumReduceTasks(0);
+ ExtractorHandler handler = ExtractorHandler.load(extractorConfigContents);
+ handler.getInputFormatHandler().set(job, new Path(input), handler.getConfig());
+ return job;
+ }
+
+ public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+ Configuration conf = HBaseConfiguration.create();
+ String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+ CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+ Long ts = getTimestamp(cli);
+ String input = BulkLoadOptions.INPUT_DATA.get(cli);
+ String table = BulkLoadOptions.TABLE.get(cli);
+ String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+ String extractorConfigContents = readExtractorConfig(new File(BulkLoadOptions.EXTRACTOR_CONFIG.get(cli)));
+ Job job = createJob(conf, input, table, cf, extractorConfigContents, ts);
+ System.out.println(conf);
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
new file mode 100644
index 0000000..4fcfb15
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
@@ -0,0 +1,14 @@
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.threatintel.ThreatIntelResults;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public interface Extractor {
+ Iterable<ThreatIntelResults> extract(String line) throws IOException;
+ void initialize(Map<String, Object> config);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java
new file mode 100644
index 0000000..fdc619e
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java
@@ -0,0 +1,10 @@
+package org.apache.metron.dataloads.extractor;
+
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public interface ExtractorCreator {
+ Extractor create();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
new file mode 100644
index 0000000..cc63c14
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
@@ -0,0 +1,73 @@
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.dataloads.extractor.inputformat.Formats;
+import org.apache.metron.dataloads.extractor.inputformat.InputFormatHandler;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public class ExtractorHandler {
+ final static ObjectMapper _mapper = new ObjectMapper();
+ private Map<String, Object> config;
+ private Extractor extractor;
+ private InputFormatHandler inputFormatHandler = Formats.BY_LINE;
+
+ public Map<String, Object> getConfig() {
+ return config;
+ }
+
+ public void setConfig(Map<String, Object> config) {
+ this.config = config;
+ }
+
+ public InputFormatHandler getInputFormatHandler() {
+ return inputFormatHandler;
+ }
+
+ public void setInputFormatHandler(String handler) {
+ try {
+ this.inputFormatHandler= Formats.create(handler);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to create an inputformathandler", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Unable to create an inputformathandler", e);
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("Unable to create an inputformathandler", e);
+ }
+ }
+
+ public Extractor getExtractor() {
+ return extractor;
+ }
+ public void setExtractor(String extractor) {
+ try {
+ this.extractor = Extractors.create(extractor);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to create an extractor", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Unable to create an extractor", e);
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("Unable to create an extractor", e);
+ }
+ }
+
+ public static synchronized ExtractorHandler load(InputStream is) throws IOException {
+ ExtractorHandler ret = _mapper.readValue(is, ExtractorHandler.class);
+ ret.getExtractor().initialize(ret.getConfig());
+ return ret;
+ }
+ public static synchronized ExtractorHandler load(String s, Charset c) throws IOException {
+ return load( new ByteArrayInputStream(s.getBytes(c)));
+ }
+ public static synchronized ExtractorHandler load(String s) throws IOException {
+ return load( s, Charset.defaultCharset());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
new file mode 100644
index 0000000..b11ccd9
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
@@ -0,0 +1,44 @@
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.dataloads.extractor.csv.CSVExtractor;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public enum Extractors implements ExtractorCreator {
+ CSV(new ExtractorCreator() {
+
+ @Override
+ public Extractor create() {
+ return new CSVExtractor();
+ }
+ })
+ ,STIX(new ExtractorCreator() {
+ @Override
+ public Extractor create() {
+ return new StixExtractor();
+ }
+ })
+ ;
+ ExtractorCreator _creator;
+ Extractors(ExtractorCreator creator) {
+ this._creator = creator;
+ }
+ @Override
+ public Extractor create() {
+ return _creator.create();
+ }
+ public static Extractor create(String extractorName) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ try {
+ ExtractorCreator ec = Extractors.valueOf(extractorName);
+ return ec.create();
+ }
+ catch(IllegalArgumentException iae) {
+ Extractor ex = (Extractor) Class.forName(extractorName).newInstance();
+ return ex;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
new file mode 100644
index 0000000..ee85a58
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
@@ -0,0 +1,99 @@
+package org.apache.metron.dataloads.extractor.csv;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.threatintel.ThreatIntelResults;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Created by cstella on 2/2/16.
+ */
+public class CSVExtractor implements Extractor {
+ public static final String COLUMNS_KEY="columns";
+ public static final String INDICATOR_COLUMN_KEY="indicator_column";
+ public static final String SEPARATOR_KEY="separator";
+
+ private int indicatorColumn;
+ private Map<String, Integer> columnMap = new HashMap<>();
+ private CSVParser parser;
+
+ @Override
+ public Iterable<ThreatIntelResults> extract(String line) throws IOException {
+ if(line.trim().startsWith("#")) {
+ //comment
+ return Collections.emptyList();
+ }
+ ThreatIntelResults ret = new ThreatIntelResults();
+ String[] tokens = parser.parseLine(line);
+ ret.getKey().indicator = tokens[indicatorColumn];
+ for(Map.Entry<String, Integer> kv : columnMap.entrySet()) {
+ ret.getValue().put(kv.getKey(), tokens[kv.getValue()]);
+ }
+ return Arrays.asList(ret);
+ }
+
+ private static Map.Entry<String, Integer> getColumnMapEntry(String column, int i) {
+ if(column.contains(":")) {
+ Iterable<String> tokens = Splitter.on(':').split(column);
+ String col = Iterables.getFirst(tokens, null);
+ Integer pos = Integer.parseInt(Iterables.getLast(tokens));
+ return new AbstractMap.SimpleEntry<>(col, pos);
+ }
+ else {
+ return new AbstractMap.SimpleEntry<>(column, i);
+ }
+
+ }
+ private static Map<String, Integer> getColumnMap(Map<String, Object> config) {
+ Map<String, Integer> columnMap = new HashMap<>();
+ if(config.containsKey(COLUMNS_KEY)) {
+ Object columnsObj = config.get(COLUMNS_KEY);
+ if(columnsObj instanceof String) {
+ String columns = (String)columnsObj;
+ int i = 0;
+ for (String column : Splitter.on(',').split(columns)) {
+ Map.Entry<String, Integer> e = getColumnMapEntry(column, i++);
+ columnMap.put(e.getKey(), e.getValue());
+ }
+ }
+ else if(columnsObj instanceof List) {
+ List columns = (List)columnsObj;
+ int i = 0;
+ for(Object column : columns) {
+ Map.Entry<String, Integer> e = getColumnMapEntry(column.toString(), i++);
+ columnMap.put(e.getKey(), e.getValue());
+ }
+ }
+ else if(columnsObj instanceof Map) {
+ Map<Object, Object> map = (Map<Object, Object>)columnsObj;
+ for(Map.Entry<Object, Object> e : map.entrySet()) {
+ columnMap.put(e.getKey().toString(), Integer.parseInt(e.getValue().toString()));
+ }
+ }
+ }
+ return columnMap;
+ }
+
+ @Override
+ public void initialize(Map<String, Object> config) {
+ if(config.containsKey(COLUMNS_KEY)) {
+ columnMap = getColumnMap(config);
+ }
+ else {
+ throw new IllegalStateException("CSVExtractor requires " + COLUMNS_KEY + " configuration");
+ }
+ if(config.containsKey(INDICATOR_COLUMN_KEY)) {
+ indicatorColumn = columnMap.get(config.get(INDICATOR_COLUMN_KEY).toString());
+ }
+ if(config.containsKey(SEPARATOR_KEY)) {
+ char separator = config.get(SEPARATOR_KEY).toString().charAt(0);
+ parser = new CSVParserBuilder().withSeparator(separator)
+ .build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
new file mode 100644
index 0000000..9fb2231
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
@@ -0,0 +1,41 @@
+package org.apache.metron.dataloads.extractor.inputformat;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/8/16.
+ */
+public enum Formats implements InputFormatHandler{
+ BY_LINE(new InputFormatHandler() {
+ @Override
+ public void set(Job job, Path input, Map<String, Object> config) throws IOException {
+
+ FileInputFormat.addInputPath(job, input);
+ }
+ })
+ ;
+ InputFormatHandler _handler = null;
+ Formats(InputFormatHandler handler) {
+ this._handler = handler;
+ }
+ @Override
+ public void set(Job job, Path path, Map<String, Object> config) throws IOException {
+ _handler.set(job, path, config);
+ }
+
+ public static InputFormatHandler create(String handlerName) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ try {
+ InputFormatHandler ec = Formats.valueOf(handlerName);
+ return ec;
+ }
+ catch(IllegalArgumentException iae) {
+ InputFormatHandler ex = (InputFormatHandler) Class.forName(handlerName).newInstance();
+ return ex;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
new file mode 100644
index 0000000..28cd241
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
@@ -0,0 +1,14 @@
+package org.apache.metron.dataloads.extractor.inputformat;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/8/16.
+ */
+public interface InputFormatHandler {
+ void set(Job job, Path input, Map<String, Object> config) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
new file mode 100644
index 0000000..6c694f7
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
@@ -0,0 +1,95 @@
+package org.apache.metron.dataloads.extractor.inputformat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/8/16.
+ */
+public class WholeFileFormat implements InputFormatHandler {
+
+ public static class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
+ private FileSplit fileSplit;
+ private Configuration conf;
+ private Text value = new Text();
+ private boolean processed = false;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ this.fileSplit = (FileSplit) split;
+ this.conf = context.getConfiguration();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!processed) {
+ byte[] contents = new byte[(int) fileSplit.getLength()];
+ Path file = fileSplit.getPath();
+ FileSystem fs = file.getFileSystem(conf);
+ FSDataInputStream in = null;
+ try {
+ in = fs.open(file);
+ IOUtils.readFully(in, contents, 0, contents.length);
+ value.set(contents, 0, contents.length);
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ processed = true;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+ @Override
+ public Text getCurrentValue() throws IOException, InterruptedException{
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return processed ? 1.0f : 0.0f;
+ }
+
+ @Override
+ public void close() throws IOException{
+ //do nothing :)
+ }
+ }
+
+ public static class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> {
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path file) {
+ return false;
+ }
+
+ @Override
+ public RecordReader<NullWritable, Text> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ WholeFileRecordReader reader = new WholeFileRecordReader();
+ reader.initialize(split, context);
+ return reader;
+ }
+ }
+ @Override
+ public void set(Job job, Path input, Map<String, Object> config) throws IOException {
+ WholeFileInputFormat.setInputPaths(job, input);
+ job.setInputFormatClass(WholeFileInputFormat.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
new file mode 100644
index 0000000..493adb6
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
@@ -0,0 +1,96 @@
+package org.apache.metron.dataloads.extractor.stix;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandler;
+import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandlers;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.mitre.cybox.common_2.*;
+import org.mitre.cybox.cybox_2.ObjectType;
+import org.mitre.stix.common_1.IndicatorBaseType;
+import org.mitre.stix.indicator_2.Indicator;
+import org.mitre.stix.stix_1.STIXPackage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/8/16.
+ */
+public class StixExtractor implements Extractor {
+ Map<String, Object> config;
+ @Override
+ public Iterable<ThreatIntelResults> extract(String line) throws IOException {
+ STIXPackage stixPackage = STIXPackage.fromXMLString(line);
+ List<ThreatIntelResults> ret = new ArrayList<>();
+ if (stixPackage.getIndicators() != null) {
+ if (stixPackage.getIndicators().getIndicators() != null) {
+ List<IndicatorBaseType> indicators = stixPackage.getIndicators().getIndicators();
+ int indicatorCount = indicators.size();
+ for (int i = 0; i < indicatorCount; i++) {
+ Indicator indicator = (Indicator) indicators.get(i);
+ if (indicator.getObservable() != null) {
+ ObjectType obj = indicator.getObservable().getObject();
+ ObjectPropertiesType props = obj.getProperties();
+ ObjectTypeHandler handler = ObjectTypeHandlers.getHandlerByInstance(props);
+ if(handler != null) {
+ Iterables.addAll(ret, handler.extract(props, config));
+ }
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public void initialize(Map<String, Object> config) {
+ this.config = config;
+ }
+
+ public static Iterable<String> split(StringObjectPropertyType value) {
+ final ConditionTypeEnum condition = value.getCondition();
+ final ConditionApplicationEnum applyCondition = value.getApplyCondition();
+ List<String> tokens = new ArrayList<>();
+ if(condition == ConditionTypeEnum.EQUALS && applyCondition == ConditionApplicationEnum.ANY) {
+ String delim = value.getDelimiter();
+ String line = value.getValue().toString();
+ if (delim != null) {
+ for (String token : Splitter.on(delim).split(line)) {
+ tokens.add(token);
+ }
+ } else {
+ tokens.add(line);
+ }
+ }
+ return tokens;
+ }
+ public static void main(String[] args) throws IOException {
+
+ File file = new File("/tmp/sample.xml");
+
+ /*if (args.length > 0) {
+ file = new File(args[0]);
+ } else {
+ try {
+ URL url = XML2Object.class.getClass().getResource(
+ "/org/mitre/stix/examples/sample.xml");
+ file = new File(url.toURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }*/
+
+ String line = FileUtils.readFileToString(file);
+ StixExtractor extractor = new StixExtractor();
+ for(ThreatIntelResults results : extractor.extract(line)) {
+ System.out.println(results);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
new file mode 100644
index 0000000..9611150
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
@@ -0,0 +1,20 @@
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.mitre.cybox.common_2.ObjectPropertiesType;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public abstract class AbstractObjectTypeHandler<T extends ObjectPropertiesType> implements ObjectTypeHandler<T> {
+ protected Class<T> objectPropertiesType;
+ public AbstractObjectTypeHandler(Class<T> clazz) {
+ objectPropertiesType = clazz;
+ }
+ @Override
+ public Class<T> getTypeClass() {
+ return objectPropertiesType;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
new file mode 100644
index 0000000..4a280e1
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
@@ -0,0 +1,61 @@
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import com.google.common.base.Splitter;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.mitre.cybox.common_2.ConditionApplicationEnum;
+import org.mitre.cybox.common_2.ConditionTypeEnum;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+import org.mitre.cybox.objects.Address;
+import org.mitre.cybox.objects.CategoryTypeEnum;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public class AddressHandler extends AbstractObjectTypeHandler<Address> {
+ public static final String SPECIFIC_CATEGORY_CONFIG = "stix_address_categories";
+ public static final EnumSet<CategoryTypeEnum> SUPPORTED_CATEGORIES = EnumSet.of(CategoryTypeEnum.E_MAIL
+ ,CategoryTypeEnum.IPV_4_ADDR
+ ,CategoryTypeEnum.IPV_6_ADDR
+ ,CategoryTypeEnum.MAC
+ ) ;
+ public AddressHandler() {
+ super(Address.class);
+ }
+
+ @Override
+ public Iterable<ThreatIntelResults> extract(final Address type, Map<String, Object> config) throws IOException {
+ List<ThreatIntelResults> ret = new ArrayList<>();
+ final CategoryTypeEnum category= type.getCategory();
+ if(!SUPPORTED_CATEGORIES.contains(category)) {
+ return ret;
+ }
+ if(config != null && config.containsKey(SPECIFIC_CATEGORY_CONFIG)) {
+ List<CategoryTypeEnum> categories = new ArrayList<>();
+ for(String c : Splitter.on(",").split(config.get(SPECIFIC_CATEGORY_CONFIG).toString())) {
+ categories.add(CategoryTypeEnum.valueOf(c));
+ }
+ EnumSet<CategoryTypeEnum> specificCategories = EnumSet.copyOf(categories);
+ if(!specificCategories.contains(category)) {
+ return ret;
+ }
+
+ }
+ StringObjectPropertyType value = type.getAddressValue();
+ for(String token : StixExtractor.split(value)) {
+ ThreatIntelResults results = new ThreatIntelResults(new ThreatIntelKey(token),
+ new HashMap<String, String>() {{
+ put("source-type", "STIX");
+ put("indicator-type", "Address");
+ put("source", type.toXMLString());
+ }}
+ );
+ ret.add(results);
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
new file mode 100644
index 0000000..51769ff
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
@@ -0,0 +1,41 @@
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+import org.mitre.cybox.objects.DomainName;
+import org.mitre.cybox.objects.DomainNameTypeEnum;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public class DomainHandler extends AbstractObjectTypeHandler<DomainName> {
+ EnumSet<DomainNameTypeEnum> SUPPORTED_TYPES = EnumSet.of(DomainNameTypeEnum.FQDN);
+ public DomainHandler() {
+ super(DomainName.class);
+ }
+
+ @Override
+ public Iterable<ThreatIntelResults> extract(final DomainName type, Map<String, Object> config) throws IOException {
+ List<ThreatIntelResults> ret = new ArrayList<>();
+ final DomainNameTypeEnum domainType = type.getType();
+ if(SUPPORTED_TYPES.contains(domainType)) {
+ StringObjectPropertyType value = type.getValue();
+ for (String token : StixExtractor.split(value)) {
+ ThreatIntelResults results = new ThreatIntelResults(new ThreatIntelKey(token),
+ new HashMap<String, String>() {{
+ put("source-type", "STIX");
+ put("indicator-type", "DomainName");
+ put("source", type.toXMLString());
+ }}
+ );
+ ret.add(results);
+ }
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
new file mode 100644
index 0000000..d6bfd05
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
@@ -0,0 +1,39 @@
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+import org.mitre.cybox.objects.Hostname;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public class HostnameHandler extends AbstractObjectTypeHandler<Hostname>{
+ public HostnameHandler() {
+ super(Hostname.class);
+ }
+
+ @Override
+ public Iterable<ThreatIntelResults> extract(final Hostname type, Map<String, Object> config) throws IOException {
+ StringObjectPropertyType value = type.getHostnameValue();
+ List<ThreatIntelResults> ret = new ArrayList<>();
+ for(String token : StixExtractor.split(value)) {
+ ThreatIntelResults results = new ThreatIntelResults(new ThreatIntelKey(token),
+ new HashMap<String, String>() {{
+ put("source-type", "STIX");
+ put("indicator-type", "Hostname");
+ put("source", type.toXMLString());
+ }}
+ );
+ ret.add(results);
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
new file mode 100644
index 0000000..8de5dd2
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
@@ -0,0 +1,16 @@
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.mitre.cybox.common_2.ObjectPropertiesType;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public interface ObjectTypeHandler<T extends ObjectPropertiesType> {
+ Iterable<ThreatIntelResults> extract(T type, Map<String, Object> config) throws IOException;
+ Class<T> getTypeClass();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java
new file mode 100644
index 0000000..85d20b2
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java
@@ -0,0 +1,28 @@
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.mitre.cybox.common_2.ObjectPropertiesType;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public enum ObjectTypeHandlers {
+ ADDRESS(new AddressHandler())
+ ,HOSTNAME(new HostnameHandler())
+ ,DOMAINNAME(new DomainHandler())
+ ,;
+ ObjectTypeHandler _handler;
+ ObjectTypeHandlers(ObjectTypeHandler handler) {
+ _handler = handler;
+ }
+ ObjectTypeHandler getHandler() {
+ return _handler;
+ }
+ public static ObjectTypeHandler getHandlerByInstance(ObjectPropertiesType inst) {
+ for(ObjectTypeHandlers h : values()) {
+ if(inst.getClass().equals(h.getHandler().getTypeClass())) {
+ return h.getHandler();
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
new file mode 100644
index 0000000..84931df
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
@@ -0,0 +1,52 @@
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.apache.metron.threatintel.hbase.Converter;
+
+import java.io.IOException;
+
+/**
+ * Created by cstella on 2/3/16.
+ */
+public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable, Put>
+{
+ public static final String CONFIG_KEY="bl_extractor_config";
+ public static final String COLUMN_FAMILY_KEY = "bl_column_family";
+ public static final String LAST_SEEN_KEY = "bl_last_seen";
+ Extractor extractor = null;
+ String columnFamily = null;
+ Long lastSeen = null;
+ @Override
+ public void setup(Context context) throws IOException,
+ InterruptedException {
+ initialize(context.getConfiguration());
+ }
+
+ @Override
+ public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
+ for(ThreatIntelResults results : extractor.extract(value.toString())) {
+ if (results != null) {
+ Put put = Converter.INSTANCE.toPut(columnFamily, results.getKey(), results.getValue(), lastSeen);
+ write(new ImmutableBytesWritable(results.getKey().toBytes()), put, context);
+ }
+ }
+ }
+
+ protected void initialize(Configuration configuration) throws IOException {
+ String configStr = configuration.get(CONFIG_KEY);
+ extractor = ExtractorHandler.load(configStr).getExtractor();
+ columnFamily = configuration.get(COLUMN_FAMILY_KEY);
+ lastSeen = Long.parseLong(configuration.get(LAST_SEEN_KEY));
+ }
+
+ protected void write(ImmutableBytesWritable key, Put value, Context context) throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+}