You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/02/16 18:10:21 UTC
[5/5] incubator-metron git commit: METRON-35 Implement threat
intelligence message enrichment closes apache/incubator-metron#22
METRON-35 Implement threat intelligence message enrichment closes apache/incubator-metron#22
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/1ddfd12c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/1ddfd12c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/1ddfd12c
Branch: refs/heads/master
Commit: 1ddfd12c274ccf45042d9a6533e96840936c1aaa
Parents: 905b09f
Author: cestella <ce...@gmail.com>
Authored: Tue Feb 16 12:09:54 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Tue Feb 16 12:09:54 2016 -0500
----------------------------------------------------------------------
.gitignore | 2 +-
.../inventory/singlenode-vagrant/group_vars/all | 2 +
deployment/roles/hadoop_setup/tasks/main.yml | 6 +-
.../roles/metron_streaming/tasks/main.yml | 14 +
deployment/roles/metron_streaming/vars/main.yml | 2 +
metron-streaming/Metron-Alerts/pom.xml | 274 ++++----
.../src/main/resources/hbase-site.xml | 131 ----
metron-streaming/Metron-Common/pom.xml | 378 ++++++-----
.../enrichment/EnrichmentSplitterBolt.java | 112 ++++
.../interfaces/EnrichmentAdapter.java | 1 +
.../java/org/apache/metron/hbase/Connector.java | 6 +-
.../java/org/apache/metron/hbase/HBaseBolt.java | 37 +-
.../apache/metron/hbase/HTableConnector.java | 38 +-
.../org/apache/metron/hbase/HTableProvider.java | 17 +
.../org/apache/metron/hbase/TableConfig.java | 103 +++
.../org/apache/metron/hbase/TableProvider.java | 14 +
.../apache/metron/hbase/TupleTableConfig.java | 102 +--
.../apache/metron/reference/lookup/Lookup.java | 61 ++
.../metron/reference/lookup/LookupKey.java | 8 +
.../lookup/accesstracker/AccessTracker.java | 23 +
.../lookup/accesstracker/AccessTrackerUtil.java | 69 ++
.../accesstracker/BloomAccessTracker.java | 132 ++++
.../accesstracker/PersistentAccessTracker.java | 192 ++++++
.../reference/lookup/handler/Handler.java | 13 +
.../metron/threatintel/ThreatIntelKey.java | 75 +++
.../metron/threatintel/ThreatIntelResults.java | 55 ++
.../metron/threatintel/hbase/Converter.java | 82 +++
.../threatintel/hbase/ThreatIntelLookup.java | 60 ++
metron-streaming/Metron-DataLoads/pom.xml | 339 +++++++---
.../src/main/assembly/assembly.xml | 28 +
.../src/main/bash/threatintel_bulk_load.sh | 21 +
.../src/main/bash/threatintel_bulk_prune.sh | 21 +
.../dataloads/LeastRecentlyUsedPruner.java | 207 ++++++
.../metron/dataloads/ThreatIntelBulkLoader.java | 199 ++++++
.../metron/dataloads/extractor/Extractor.java | 14 +
.../dataloads/extractor/ExtractorCreator.java | 10 +
.../dataloads/extractor/ExtractorHandler.java | 73 ++
.../metron/dataloads/extractor/Extractors.java | 44 ++
.../dataloads/extractor/csv/CSVExtractor.java | 99 +++
.../extractor/inputformat/Formats.java | 41 ++
.../inputformat/InputFormatHandler.java | 14 +
.../extractor/inputformat/WholeFileFormat.java | 95 +++
.../dataloads/extractor/stix/StixExtractor.java | 96 +++
.../stix/types/AbstractObjectTypeHandler.java | 20 +
.../extractor/stix/types/AddressHandler.java | 61 ++
.../extractor/stix/types/DomainHandler.java | 41 ++
.../extractor/stix/types/HostnameHandler.java | 39 ++
.../extractor/stix/types/ObjectTypeHandler.java | 16 +
.../stix/types/ObjectTypeHandlers.java | 28 +
.../dataloads/hbase/mr/BulkLoadMapper.java | 52 ++
.../metron/dataloads/hbase/mr/PrunerMapper.java | 62 ++
.../src/main/resources/hbase-site.xml | 100 ---
.../dataloads/extractor/ExtractorTest.java | 61 ++
.../extractor/csv/CSVExtractorTest.java | 102 +++
.../extractor/stix/StixExtractorTest.java | 185 +++++
.../dataloads/hbase/HBaseConverterTest.java | 53 ++
.../hbase/mr/BulkLoadMapperIntegrationTest.java | 91 +++
.../dataloads/hbase/mr/BulkLoadMapperTest.java | 78 +++
.../metron/dataloads/hbase/mr/HBaseUtil.java | 58 ++
.../LeastRecentlyUsedPrunerIntegrationTest.java | 124 ++++
.../src/main/resources/hbase-site.xml | 127 ----
.../Metron-EnrichmentAdapters/pom.xml | 302 ++++-----
.../adapters/cif/CIFHbaseAdapter.java | 5 +
.../enrichment/adapters/geo/GeoAdapter.java | 5 +
.../adapters/host/HostFromJSONListAdapter.java | 5 +
.../host/HostFromPropertiesFileAdapter.java | 7 +-
.../enrichment/adapters/jdbc/JdbcAdapter.java | 4 +-
.../adapters/threat/ThreatHbaseAdapter.java | 8 +-
.../adapters/whois/WhoisHBaseAdapter.java | 7 +-
.../enrichment/bolt/EnrichmentJoinBolt.java | 15 +-
.../enrichment/bolt/GenericEnrichmentBolt.java | 19 +-
.../metron/threatintel/ThreatIntelAdapter.java | 97 +++
.../metron/threatintel/ThreatIntelConfig.java | 111 +++
.../src/main/resources/hbase-site.xml | 131 ----
.../adapters/cif/CIFHbaseAdapterTest.java | 4 +-
.../adapters/geo/GeoMysqlAdapterTest.java | 4 +-
.../adapters/whois/WhoisHBaseAdapterTest.java | 3 +-
metron-streaming/Metron-Indexing/pom.xml | 188 +++---
metron-streaming/Metron-MessageParsers/pom.xml | 10 +
.../apache/metron/bolt/TelemetryParserBolt.java | 54 +-
.../src/main/resources/hbase-site.xml | 127 ----
metron-streaming/Metron-Topologies/pom.xml | 428 +++++++-----
.../Metron_Configs/etc/env/config.properties | 10 +-
.../resources/Metron_Configs/etc/hbase-site.xml | 127 ----
.../Metron_Configs/topologies/pcap/local.yaml | 122 +++-
.../Metron_Configs/topologies/pcap/remote.yaml | 190 ++++--
.../src/main/resources/hbase-site.xml | 131 ----
.../integration/pcap/PcapIntegrationTest.java | 53 +-
.../util/integration/ComponentRunner.java | 2 +-
.../components/ElasticSearchComponent.java | 11 +-
.../integration/util/mock/MockGeoAdapter.java | 5 +
.../integration/util/mock/MockHTable.java | 668 +++++++++++++++++++
.../util/threatintel/ThreatIntelHelper.java | 22 +
metron-streaming/pom.xml | 17 +-
94 files changed, 5583 insertions(+), 1917 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 7da5300..c9d729b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,4 +16,4 @@ target
*.project
*.classpath
*.settings
-
+*hbase-site.xml
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/deployment/inventory/singlenode-vagrant/group_vars/all
----------------------------------------------------------------------
diff --git a/deployment/inventory/singlenode-vagrant/group_vars/all b/deployment/inventory/singlenode-vagrant/group_vars/all
index 44d954b..bf3c610 100644
--- a/deployment/inventory/singlenode-vagrant/group_vars/all
+++ b/deployment/inventory/singlenode-vagrant/group_vars/all
@@ -29,6 +29,8 @@ yaf_topic: "ipfix"
snort_topic: "snort"
bro_topic: "bro"
pcap_hbase_table: "pcap_test"
+tracker_hbase_table: "access_tracker"
+threatintel_ip_hbase_table: "malicious_ip"
pycapa_repo: "https://github.com/OpenSOC/pycapa.git"
pycapa_home: "/opt/pycapa"
pycapa_topic: "pcap"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/deployment/roles/hadoop_setup/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/hadoop_setup/tasks/main.yml b/deployment/roles/hadoop_setup/tasks/main.yml
index 2e240e8..d8069a0 100644
--- a/deployment/roles/hadoop_setup/tasks/main.yml
+++ b/deployment/roles/hadoop_setup/tasks/main.yml
@@ -1,7 +1,11 @@
---
- name: Create HBase tables
- shell: echo "create '{{ pcap_hbase_table }}','t'" | hbase shell -n
+ shell: echo "create '{{ item }}','t'" | hbase shell -n
ignore_errors: yes
+ with_items:
+ - "{{ pcap_hbase_table }}"
+ - "{{ tracker_hbase_table }}"
+ - "{{ threatintel_ip_hbase_table }}"
- name: Create Kafka topics
shell: "{{ kafka_home }}/bin/kafka-topics.sh --zookeeper {{ zookeeper_url }} --create --topic {{ item }} --partitions 1 --replication-factor 1"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/deployment/roles/metron_streaming/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/main.yml b/deployment/roles/metron_streaming/tasks/main.yml
index de387cf..0164d55 100644
--- a/deployment/roles/metron_streaming/tasks/main.yml
+++ b/deployment/roles/metron_streaming/tasks/main.yml
@@ -3,6 +3,7 @@
file: path="{{ metron_directory }}/{{ item.name }}" state=directory mode=0755
with_items:
- { name: 'lib'}
+ - { name: 'bin'}
- { name: 'config'}
- stat: path={{ metron_jar_path }}
@@ -16,6 +17,14 @@
src: "{{ metron_jar_path }}"
dest: "{{ metron_directory }}/lib/"
+- name: Copy Metron DataLoads bundle
+ copy:
+ src: "{{ metron_dataloads_path }}"
+ dest: "{{ metron_directory }}"
+
+- name: Unbundle Metron DataLoads bundle
+ shell: cd {{ metron_directory }} && tar xzvf *.tar.gz
+
- name: Alternatives link for "java"
alternatives: name={{ item.name }} link={{ item.link }} path={{ item.path }}
with_items:
@@ -50,6 +59,11 @@
- { regexp: "spout.kafka.topic.pcap=", line: "spout.kafka.topic.pcap={{ pycapa_topic }}" }
- { regexp: "spout.kafka.topic.bro=", line: "spout.kafka.topic.bro={{ bro_topic }}" }
- { regexp: "bolt.hbase.table.name=", line: "bolt.hbase.table.name={{ pcap_hbase_table }}" }
+ - { regexp: "threat.intel.tracker.table=", line: "threat.intel.tracker.table={{ tracker_hbase_table }}" }
+ - { regexp: "threat.intel.tracker.cf=", line: "threat.intel.tracker.cf=t" }
+ - { regexp: "threat.intel.ip.table=", line: "threat.intel.ip.table={{ threatintel_ip_hbase_table }}" }
+ - { regexp: "threat.intel.ip.cf=", line: "threat.intel.ip.cf=t" }
+ - { regexp: "mysql.ip=", line: "mysql.ip={{ groups.search[0] }}" }
- name: Add Elasticsearch templates for topologies
uri:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/deployment/roles/metron_streaming/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/vars/main.yml b/deployment/roles/metron_streaming/vars/main.yml
index e7004c1..a318db3 100644
--- a/deployment/roles/metron_streaming/vars/main.yml
+++ b/deployment/roles/metron_streaming/vars/main.yml
@@ -2,6 +2,8 @@
# vars file for elasticsearch
metron_directory: /usr/metron/{{ metron_version }}
metron_jar_name: Metron-Topologies-{{ metron_version }}.jar
+metron_dataloads_name: Metron-DataLoads-{{ metron_version }}-archive.tar.gz
metron_jar_path: "{{ playbook_dir }}/../../metron-streaming/Metron-Topologies/target/{{ metron_jar_name }}"
+metron_dataloads_path: "{{ playbook_dir }}/../../metron-streaming/Metron-DataLoads/target/{{ metron_dataloads_name }}"
metron_src_config_path: "{{ playbook_dir }}/../../metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs"
metron_properties_config_path: "{{ metron_directory }}/config/etc/env/config.properties"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Alerts/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/pom.xml b/metron-streaming/Metron-Alerts/pom.xml
index 971a4bb..8a2a6c8 100644
--- a/metron-streaming/Metron-Alerts/pom.xml
+++ b/metron-streaming/Metron-Alerts/pom.xml
@@ -10,127 +10,157 @@
the specific language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.metron</groupId>
- <artifactId>Metron-Streaming</artifactId>
- <version>0.6BETA</version>
- </parent>
- <artifactId>Metron-Alerts</artifactId>
- <name>Metron-Alerts</name>
- <description>Taggers for alerts</description>
- <properties>
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Streaming</artifactId>
+ <version>0.6BETA</version>
+ </parent>
+ <artifactId>Metron-Alerts</artifactId>
+ <name>Metron-Alerts</name>
+ <description>Taggers for alerts</description>
+ <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <commons.validator.version>1.4.0</commons.validator.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.metron</groupId>
- <artifactId>Metron-Common</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>com.googlecode.json-simple</groupId>
- <artifactId>json-simple</artifactId>
- <version>${global_json_simple_version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${global_storm_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.2</artifactId>
- <version>${global_kafka_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.jms</groupId>
- <artifactId>jms</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <version>${global_metrics_version}</version>
- </dependency>
- <dependency>
- <groupId>commons-validator</groupId>
- <artifactId>commons-validator</artifactId>
- <version>${commons.validator.version}</version>
- <exclusions>
- <exclusion>
-
- <groupId>commons-beanutils</groupId>
-
- <artifactId>commons-beanutils</artifactId>
-
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.18</version>
- <configuration>
- <systemProperties>
- <property>
- <name>mode</name>
- <value>local</value>
- </property>
- </systemProperties>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-pmd-plugin</artifactId>
- <version>3.3</version>
- <configuration>
- <targetJdk>1.7</targetJdk>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>emma-maven-plugin</artifactId>
- <version>1.0-alpha-3</version>
- <inherited>true</inherited>
- </plugin>
- </plugins>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </build>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <commons.validator.version>1.4.0</commons.validator.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_hbase_guava_version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>${global_json_simple_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>${global_kafka_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${global_metrics_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-validator</groupId>
+ <artifactId>commons-validator</artifactId>
+ <version>${commons.validator.version}</version>
+ <exclusions>
+ <exclusion>
+
+ <groupId>commons-beanutils</groupId>
+
+ <artifactId>commons-beanutils</artifactId>
+
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.18</version>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>mode</name>
+ <value>local</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-pmd-plugin</artifactId>
+ <version>3.3</version>
+ <configuration>
+ <targetJdk>1.7</targetJdk>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>emma-maven-plugin</artifactId>
+ <version>1.0-alpha-3</version>
+ <inherited>true</inherited>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml b/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml
deleted file mode 100644
index 8d812a9..0000000
--- a/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml
+++ /dev/null
@@ -1,131 +0,0 @@
-<!--Tue Apr 1 18:16:39 2014-->
- <configuration>
- <property>
- <name>hbase.tmp.dir</name>
- <value>/disk/h/hbase</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.chunkpool.maxsize</name>
- <value>0.5</value>
- </property>
- <property>
- <name>hbase.regionserver.codecs</name>
- <value>lzo,gz,snappy</value>
- </property>
- <property>
- <name>hbase.hstore.flush.retries.number</name>
- <value>120</value>
- </property>
- <property>
- <name>hbase.client.keyvalue.maxsize</name>
- <value>10485760</value>
- </property>
- <property>
- <name>hbase.rootdir</name>
- <value>hdfs://nn1:8020/apps/hbase/data</value>
- </property>
- <property>
- <name>hbase.defaults.for.version.skip</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.client.scanner.caching</name>
- <value>100</value>
- </property>
- <property>
- <name>hbase.superuser</name>
- <value>hbase</value>
- </property>
- <property>
- <name>hfile.block.cache.size</name>
- <value>0.40</value>
- </property>
- <property>
- <name>hbase.regionserver.checksum.verify</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.mslab.enabled</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.hregion.max.filesize</name>
- <value>107374182400</value>
- </property>
- <property>
- <name>hbase.cluster.distributed</name>
- <value>true</value>
- </property>
- <property>
- <name>zookeeper.session.timeout</name>
- <value>30000</value>
- </property>
- <property>
- <name>zookeeper.znode.parent</name>
- <value>/hbase-unsecure</value>
- </property>
- <property>
- <name>hbase.regionserver.global.memstore.lowerLimit</name>
- <value>0.38</value>
- </property>
- <property>
- <name>hbase.regionserver.handler.count</name>
- <value>240</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.mslab.chunksize</name>
- <value>8388608</value>
- </property>
- <property>
- <name>hbase.zookeeper.quorum</name>
- <value>zkpr1,zkpr2,zkpr3</value>
- </property>
- <property>
- <name>hbase.zookeeper.useMulti</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.hregion.majorcompaction</name>
- <value>86400000</value>
- </property>
- <property>
- <name>hbase.hstore.blockingStoreFiles</name>
- <value>200</value>
- </property>
- <property>
- <name>hbase.zookeeper.property.clientPort</name>
- <value>2181</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.flush.size</name>
- <value>134217728</value>
- </property>
- <property>
- <name>hbase.security.authorization</name>
- <value>false</value>
- </property>
- <property>
- <name>hbase.regionserver.global.memstore.upperLimit</name>
- <value>0.4</value>
- </property>
- <property>
- <name>hbase.hstore.compactionThreshold</name>
- <value>4</value>
- </property>
- <property>
- <name>hbase.hregion.memstore.block.multiplier</name>
- <value>8</value>
- </property>
- <property>
- <name>hbase.security.authentication</name>
- <value>simple</value>
- </property>
- <property>
- <name>dfs.client.read.shortcircuit</name>
- <value>true</value>
- </property>
- <property>
- <name>dfs.domain.socket.path</name>
- <value>/var/run/hdfs/dn_socket</value>
- </property>
- </configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
index 75ee9d9..97ec9c7 100644
--- a/metron-streaming/Metron-Common/pom.xml
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -10,161 +10,229 @@
the specific language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.metron</groupId>
- <artifactId>Metron-Streaming</artifactId>
- <version>0.6BETA</version>
- </parent>
- <artifactId>Metron-Common</artifactId>
- <name>Metron-Common</name>
- <description>Components common to all enrichments</description>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <commons.config.version>1.10</commons.config.version>
- <hbase.version>0.98.5-hadoop2</hbase.version>
- </properties>
- <repositories>
- <repository>
- <id>Metron-Kraken-Repo</id>
- <name>Metron Kraken Repository</name>
- <url>https://raw.github.com/opensoc/kraken/mvn-repo</url>
- </repository>
- </repositories>
- <dependencies>
- <dependency>
- <groupId>com.googlecode.json-simple</groupId>
- <artifactId>json-simple</artifactId>
- <version>${global_json_simple_version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${global_storm_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
-
- <artifactId>servlet-api</artifactId>
-
- <groupId>javax.servlet</groupId>
-
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.2</artifactId>
- <version>${global_kafka_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.jms</groupId>
- <artifactId>jms</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <version>${global_metrics_version}</version>
- </dependency>
- <dependency>
- <groupId>com.codahale.metrics</groupId>
- <artifactId>metrics-graphite</artifactId>
- <version>${global_metrics_version}</version>
- </dependency>
- <dependency>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- <version>${commons.config.version}</version>
- </dependency>
- <dependency>
- <groupId>org.krakenapps</groupId>
- <artifactId>kraken-pcap</artifactId>
- <version>${global_pcap_version}</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${global_junit_version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${hbase.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.github.fge</groupId>
- <artifactId>json-schema-validator</artifactId>
- <version>${global_json_schema_validator_version}</version>
- </dependency>
- </dependencies>
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Streaming</artifactId>
+ <version>0.6BETA</version>
+ </parent>
+ <artifactId>Metron-Common</artifactId>
+ <name>Metron-Common</name>
+ <description>Components common to all enrichments</description>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <commons.config.version>1.10</commons.config.version>
+ </properties>
+ <repositories>
+ <repository>
+ <id>Metron-Kraken-Repo</id>
+ <name>Metron Kraken Repository</name>
+ <url>https://raw.github.com/opensoc/kraken/mvn-repo</url>
+ </repository>
+ </repositories>
+ <dependencies>
+ <dependency>
+ <groupId>com.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ <version>${global_opencsv_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>${global_json_simple_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${global_slf4j_version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>${global_kafka_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${global_metrics_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ <version>${global_metrics_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>${commons.config.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.krakenapps</groupId>
+ <artifactId>kraken-pcap</artifactId>
+ <version>${global_pcap_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-simple</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_guava_version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${global_junit_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>json-schema-validator</artifactId>
+ <version>${global_json_schema_validator_version}</version>
+ </dependency>
+ </dependencies>
- <reporting>
- <plugins>
- <!-- Normally, dependency report takes time, skip it -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-project-info-reports-plugin</artifactId>
- <version>2.7</version>
+ <reporting>
+ <plugins>
+ <!-- Normally, dependency report takes time, skip it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>2.7</version>
- <configuration>
- <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>emma-maven-plugin</artifactId>
- <version>1.0-alpha-3</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-pmd-plugin</artifactId>
- <configuration>
- <targetJdk>1.7</targetJdk>
- </configuration>
- </plugin>
- </plugins>
- </reporting>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.7</source>
- <compilerArgument>-Xlint:unchecked</compilerArgument>
- <target>1.7</target>
- </configuration>
- </plugin>
- </plugins>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- </build>
+ <configuration>
+ <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>emma-maven-plugin</artifactId>
+ <version>1.0-alpha-3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-pmd-plugin</artifactId>
+ <configuration>
+ <targetJdk>1.7</targetJdk>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.7</source>
+ <compilerArgument>-Xlint:unchecked</compilerArgument>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.4</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.metron.guava</shadedPattern>
+ </relocation>
+ </relocations>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resource>.yaml</resource>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass></mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java
new file mode 100644
index 0000000..503d3ae
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java
@@ -0,0 +1,112 @@
+package org.apache.metron.enrichment;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import com.google.common.base.Splitter;
+import org.apache.metron.bolt.SplitBolt;
+import org.apache.metron.domain.Enrichment;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Created by cstella on 2/10/16.
+ */
+public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
+ protected static final Logger LOG = LoggerFactory.getLogger(EnrichmentSplitterBolt.class);
+ protected List<Enrichment> enrichments = new ArrayList<>();
+ protected String messageFieldName = "message";
+ /**
+ * @param enrichments A class for sending tuples to enrichment bolt
+ * @return Instance of this class
+ */
+ public EnrichmentSplitterBolt withEnrichments(List<Enrichment> enrichments) {
+ this.enrichments = enrichments;
+ return this;
+ }
+ public EnrichmentSplitterBolt withMessageFieldName(String messageFieldName) {
+ this.messageFieldName = messageFieldName;
+ return this;
+ }
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext) {
+
+ }
+ @Override
+ public String getKey(Tuple tuple, JSONObject message) {
+ String key = null;
+ try {
+ key = tuple.getStringByField("key");
+ }
+ catch(Throwable t) {
+ //swallowing this just in case.
+ }
+ if(key != null) {
+ return key;
+ }
+ else {
+ return UUID.randomUUID().toString();
+ }
+ }
+
+ @Override
+ public List<JSONObject> generateMessages(Tuple tuple) {
+ return Arrays.asList((JSONObject)tuple.getValueByField(messageFieldName));
+ }
+
+ @Override
+ public Set<String> getStreamIds() {
+ Set<String> streamIds = new HashSet<>();
+ for(Enrichment enrichment: enrichments) {
+ streamIds.add(enrichment.getName());
+ }
+ return streamIds;
+ }
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, JSONObject> splitMessage(JSONObject message) {
+
+ Map<String, JSONObject> streamMessageMap = new HashMap<>();
+ for (Enrichment enrichment : enrichments) {
+ List<String> fields = enrichment.getFields();
+ if (fields != null && fields.size() > 0) {
+ JSONObject enrichmentObject = new JSONObject();
+ for (String field : fields) {
+ enrichmentObject.put(field, getField(message,field));
+ }
+ streamMessageMap.put(enrichment.getName(), enrichmentObject);
+ }
+ }
+ /*if(message != null && enrichments.size() != 1) {
+ throw new RuntimeException("JSON: " + message.toJSONString() + " => " + streamMessageMap);
+ }*/
+ return streamMessageMap;
+ }
+
+ public Object getField(JSONObject object, String path) {
+ Map ret = object;
+ for(String node: Splitter.on('/').split(path)) {
+ Object o = ret.get(node);
+ if(o instanceof Map) {
+ ret = (Map) o;
+ }
+ else {
+ return o;
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public void declareOther(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public void emitOther(Tuple tuple, List<JSONObject> messages) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
index 8eb5937..a51bebd 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
@@ -21,6 +21,7 @@ import org.json.simple.JSONObject;
public interface EnrichmentAdapter<T>
{
+ void logAccess(T value);
JSONObject enrich(T value);
boolean initializeAdapter();
void cleanup();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
index 33b4eb8..a025e70 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java
@@ -11,15 +11,15 @@ import java.io.Serializable;
* Created by cstella on 1/29/16.
*/
public abstract class Connector {
- protected TupleTableConfig tableConf;
+ protected TableConfig tableConf;
protected String _quorum;
protected String _port;
- public Connector(final TupleTableConfig conf, String _quorum, String _port) throws IOException {
+ public Connector(final TableConfig conf, String _quorum, String _port) throws IOException {
this.tableConf = conf;
this._quorum = _quorum;
this._port = _port;
}
- public abstract void put(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException;
+ public abstract void put(Put put) throws IOException;
public abstract void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
index 64f3531..e026b18 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
@@ -11,6 +11,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
@@ -44,7 +45,6 @@ public class HBaseBolt implements IRichBolt {
protected TupleTableConfig conf;
protected boolean autoAck = true;
protected Connector connector;
- private String connectorImpl;
private String _quorum;
private String _port;
@@ -70,32 +70,11 @@ public class HBaseBolt implements IRichBolt {
String hostPortPair = Iterables.getFirst(Splitter.on(",").split(connString), "");
return Iterables.getLast(Splitter.on(":").split(hostPortPair),DEFAULT_ZK_PORT);
}
- public HBaseBolt withConnector(String connectorImpl) {
- this.connectorImpl = connectorImpl;
- return this;
- }
+
public Connector createConnector() throws IOException{
initialize();
- if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
- return new HTableConnector(conf, _quorum, _port);
- }
- else {
- try {
- Class<? extends Connector> clazz = (Class<? extends Connector>) Class.forName(connectorImpl);
- return clazz.getConstructor(TupleTableConfig.class, String.class, String.class).newInstance(conf, _quorum, _port);
- } catch (InstantiationException e) {
- throw new IOException("Unable to instantiate connector.", e);
- } catch (IllegalAccessException e) {
- throw new IOException("Unable to instantiate connector: illegal access", e);
- } catch (InvocationTargetException e) {
- throw new IOException("Unable to instantiate connector", e);
- } catch (NoSuchMethodException e) {
- throw new IOException("Unable to instantiate connector: no such method", e);
- } catch (ClassNotFoundException e) {
- throw new IOException("Unable to instantiate connector: class not found", e);
- }
- }
+ return new HTableConnector(conf, _quorum, _port);
}
public void initialize() {
@@ -121,8 +100,9 @@ public class HBaseBolt implements IRichBolt {
this.collector = collector;
try {
- this.connector = createConnector();
-
+ if(connector == null) {
+ this.connector = createConnector();
+ }
} catch (IOException e) {
throw new RuntimeException(e);
@@ -135,11 +115,12 @@ public class HBaseBolt implements IRichBolt {
public void execute(Tuple input) {
try {
- this.connector.put(conf.getPutFromTuple(input));
+ Put p = conf.getPutFromTuple(input);
+ this.connector.put(p);
} catch (IOException ex) {
JSONObject error = ErrorGenerator.generateErrorMessage(
- "Alerts problem: " + input.getBinary(0), ex);
+ "Alerts problem: " + input.toString(), ex);
collector.emit("error", new Values(error));
throw new RuntimeException(ex);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
index 5302882..74c6acf 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java
@@ -3,6 +3,7 @@ package org.apache.metron.hbase;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
@@ -11,6 +12,7 @@ import com.google.common.collect.Iterables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.Bytes;
@@ -30,10 +32,9 @@ import javax.annotation.Nullable;
public class HTableConnector extends Connector implements Serializable{
private static final Logger LOG = Logger.getLogger(HTableConnector.class);
private Configuration conf;
- protected HTable table;
+ protected HTableInterface table;
private String tableName;
-
-
+ private String connectorImpl;
/**
@@ -41,8 +42,9 @@ public class HTableConnector extends Connector implements Serializable{
* @param conf The {@link TupleTableConfig}
* @throws IOException
*/
- public HTableConnector(final TupleTableConfig conf, String _quorum, String _port) throws IOException {
+ public HTableConnector(final TableConfig conf, String _quorum, String _port) throws IOException {
super(conf, _quorum, _port);
+ this.connectorImpl = conf.getConnectorImpl();
this.tableName = conf.getTableName();
this.conf = HBaseConfiguration.create();
@@ -56,7 +58,7 @@ public class HTableConnector extends Connector implements Serializable{
this.conf.get("hbase.rootdir")));
try {
- this.table = new HTable(this.conf, this.tableName);
+ this.table = getTableProvider().getTable(this.conf, this.tableName);
} catch (IOException ex) {
throw new IOException("Unable to establish connection to HBase table " + this.tableName, ex);
}
@@ -88,6 +90,28 @@ public class HTableConnector extends Connector implements Serializable{
}
}
+ protected TableProvider getTableProvider() throws IOException {
+ if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
+ return new HTableProvider();
+ }
+ else {
+ try {
+ Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
+ return clazz.getConstructor().newInstance();
+ } catch (InstantiationException e) {
+ throw new IOException("Unable to instantiate connector.", e);
+ } catch (IllegalAccessException e) {
+ throw new IOException("Unable to instantiate connector: illegal access", e);
+ } catch (InvocationTargetException e) {
+ throw new IOException("Unable to instantiate connector", e);
+ } catch (NoSuchMethodException e) {
+ throw new IOException("Unable to instantiate connector: no such method", e);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Unable to instantiate connector: class not found", e);
+ }
+ }
+ }
+
/**
* Checks to see if table contains the given column family
* @param columnFamily The column family name
@@ -101,12 +125,12 @@ public class HTableConnector extends Connector implements Serializable{
/**
* @return the table
*/
- public HTable getTable() {
+ public HTableInterface getTable() {
return table;
}
@Override
- public void put(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
+ public void put(Put put) throws IOException {
table.put(put);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
new file mode 100644
index 0000000..f7e066c
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
@@ -0,0 +1,17 @@
+package org.apache.metron.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import java.io.IOException;
+
+/**
+ * Created by cstella on 2/11/16.
+ */
+public class HTableProvider implements TableProvider {
+ @Override
+ public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+ return new HTable(config, tableName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java
new file mode 100644
index 0000000..8c0966b
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableConfig.java
@@ -0,0 +1,103 @@
+package org.apache.metron.hbase;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Created by cstella on 2/11/16.
+ */
+public class TableConfig implements Serializable {
+ static final long serialVersionUID = -1L;
+ private String tableName;
+ private boolean batch = true;
+ protected Map<String, Set<String>> columnFamilies = new HashMap<>();
+ private long writeBufferSize = 0L;
+ private String connectorImpl;
+
+ public TableConfig() {
+
+ }
+
+ public TableConfig(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public TableConfig withConnectorImpl(String impl) {
+ connectorImpl = impl;
+ return this;
+ }
+
+ public TableConfig withTable(String table) {
+ this.tableName = table;
+ return this;
+ }
+
+ public TableConfig withBatch(Boolean isBatch) {
+ this.batch = isBatch;
+ return this;
+ }
+
+ public String getConnectorImpl() {
+ return connectorImpl;
+ }
+
+ /**
+ * @return Whether batch mode is enabled
+ */
+ public boolean isBatch() {
+ return batch;
+ }
+
+ /**
+ * @param batch
+ * Whether to enable HBase's client-side write buffer.
+ * <p>
+ * When enabled your bolt will store put operations locally until the
+ * write buffer is full, so they can be sent to HBase in a single RPC
+ * call. When disabled each put operation is effectively an RPC and
+ * is sent straight to HBase. As your bolt can process thousands of
+ * values per second it is recommended that the write buffer is
+ * enabled.
+ * <p>
+ * Enabled by default
+ */
+ public void setBatch(boolean batch) {
+ this.batch = batch;
+ }
+ /**
+ * @param writeBufferSize
+ * Overrides the client-side write buffer size.
+ * <p>
+ * By default the write buffer size is 2 MB (2097152 bytes). If you
+ * are storing larger data, you may want to consider increasing this
+ * value to allow your bolt to efficiently group together a larger
+ * number of records per RPC
+ * <p>
+ * Overrides the write buffer size you have set in your
+ * hbase-site.xml e.g. <code>hbase.client.write.buffer</code>
+ */
+ public void setWriteBufferSize(long writeBufferSize) {
+ this.writeBufferSize = writeBufferSize;
+ }
+
+ /**
+ * @return the writeBufferSize
+ */
+ public long getWriteBufferSize() {
+ return writeBufferSize;
+ }
+ /**
+ * @return A Set of configured column families
+ */
+ public Set<String> getColumnFamilies() {
+ return this.columnFamilies.keySet();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java
new file mode 100644
index 0000000..d643a83
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TableProvider.java
@@ -0,0 +1,14 @@
+package org.apache.metron.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Created by cstella on 2/11/16.
+ */
+public interface TableProvider extends Serializable {
+ HTableInterface getTable(Configuration config, String tableName) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
index d2c789a..6bacfb8 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java
@@ -1,5 +1,6 @@
package org.apache.metron.hbase;
+import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
@@ -8,28 +9,27 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
+import com.google.common.base.Joiner;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import backtype.storm.tuple.Tuple;
+import org.apache.log4j.Logger;
/**
* Configuration for Storm {@link Tuple} to HBase serialization.
*/
@SuppressWarnings("serial")
-public class TupleTableConfig implements Serializable {
-
+public class TupleTableConfig extends TableConfig implements Serializable {
+ private static final Logger LOG = Logger.getLogger(TupleTableConfig.class);
+ static final long serialVersionUID = -1L;
public static final long DEFAULT_INCREMENT = 1L;
- private String tableName;
protected String tupleRowKeyField;
protected String tupleTimestampField;
- protected Map<String, Set<String>> columnFamilies;
- private boolean batch = true;
protected Durability durability = Durability.USE_DEFAULT;
- private long writeBufferSize = 0L;
private String fields;
/**
@@ -41,7 +41,7 @@ public class TupleTableConfig implements Serializable {
* The {@link Tuple} field used to set the rowKey
*/
public TupleTableConfig(final String table, final String rowKeyField) {
- this.tableName = table;
+ super(table);
this.tupleRowKeyField = rowKeyField;
this.tupleTimestampField = "";
this.columnFamilies = new HashMap<String, Set<String>>();
@@ -58,20 +58,18 @@ public class TupleTableConfig implements Serializable {
* The {@link Tuple} field used to set the timestamp
*/
public TupleTableConfig(final String table, final String rowKeyField, final String timestampField) {
- this.tableName = table;
+ super(table);
this.tupleRowKeyField = rowKeyField;
this.tupleTimestampField = timestampField;
this.columnFamilies = new HashMap<String, Set<String>>();
}
public TupleTableConfig() {
+ super(null);
this.columnFamilies = new HashMap<String, Set<String>>();
}
- public TupleTableConfig withTable(String table) {
- this.tableName = table;
- return this;
- }
+
public TupleTableConfig withRowKeyField(String rowKeyField) {
this.tupleRowKeyField = rowKeyField;
@@ -88,10 +86,7 @@ public class TupleTableConfig implements Serializable {
return this;
}
- public TupleTableConfig withBatch(Boolean isBatch) {
- this.batch = isBatch;
- return this;
- }
+
public String getFields() {
return fields;
@@ -125,8 +120,14 @@ public class TupleTableConfig implements Serializable {
* The {@link Tuple}
* @return {@link Put}
*/
- public Put getPutFromTuple(final Tuple tuple) {
- byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+ public Put getPutFromTuple(final Tuple tuple) throws IOException{
+ byte[] rowKey = null;
+ try {
+ rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+ }
+ catch(IllegalArgumentException iae) {
+ throw new IOException("Unable to retrieve " + tupleRowKeyField + " from " + tuple + " [ " + Joiner.on(',').join(tuple.getFields()) + " ]", iae);
+ }
long ts = 0;
if (!tupleTimestampField.equals("")) {
@@ -225,37 +226,8 @@ public class TupleTableConfig implements Serializable {
inc.getFamilyMapOfLongs().put(family, set);
}
- /**
- * @return the tableName
- */
- public String getTableName() {
- return tableName;
- }
-
- /**
- * @return Whether batch mode is enabled
- */
- public boolean isBatch() {
- return batch;
- }
-
- /**
- * @param batch
- * Whether to enable HBase's client-side write buffer.
- * <p>
- * When enabled your bolt will store put operations locally until the
- * write buffer is full, so they can be sent to HBase in a single RPC
- * call. When disabled each put operation is effectively an RPC and
- * is sent straight to HBase. As your bolt can process thousands of
- * values per second it is recommended that the write buffer is
- * enabled.
- * <p>
- * Enabled by default
- */
- public void setBatch(boolean batch) {
- this.batch = batch;
- }
-
+
+
/**
* @param durability
* Sets whether to write to HBase's edit log.
@@ -276,36 +248,8 @@ public class TupleTableConfig implements Serializable {
return durability;
}
- /**
- * @param writeBufferSize
- * Overrides the client-side write buffer size.
- * <p>
- * By default the write buffer size is 2 MB (2097152 bytes). If you
- * are storing larger data, you may want to consider increasing this
- * value to allow your bolt to efficiently group together a larger
- * number of records per RPC
- * <p>
- * Overrides the write buffer size you have set in your
- * hbase-site.xml e.g. <code>hbase.client.write.buffer</code>
- */
- public void setWriteBufferSize(long writeBufferSize) {
- this.writeBufferSize = writeBufferSize;
- }
-
- /**
- * @return the writeBufferSize
- */
- public long getWriteBufferSize() {
- return writeBufferSize;
- }
-
- /**
- * @return A Set of configured column families
- */
- public Set<String> getColumnFamilies() {
- return this.columnFamilies.keySet();
- }
-
+
+
/**
* @return the tupleRowKeyField
*/
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/Lookup.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/Lookup.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/Lookup.java
new file mode 100644
index 0000000..052c7a6
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/Lookup.java
@@ -0,0 +1,61 @@
+package org.apache.metron.reference.lookup;
+
+import org.apache.metron.reference.lookup.accesstracker.AccessTracker;
+import org.apache.metron.reference.lookup.handler.Handler;
+
+import java.io.IOException;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class Lookup<CONTEXT_T, KEY_T extends LookupKey, RESULT_T> implements Handler<CONTEXT_T, KEY_T, RESULT_T> {
+ private String name;
+ private AccessTracker accessTracker;
+ private Handler<CONTEXT_T, KEY_T, RESULT_T> lookupHandler;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public AccessTracker getAccessTracker() {
+ return accessTracker;
+ }
+
+ public void setAccessTracker(AccessTracker accessTracker) {
+ this.accessTracker = accessTracker;
+ }
+
+ public Handler< CONTEXT_T, KEY_T, RESULT_T > getLookupHandler() {
+ return lookupHandler;
+ }
+
+ public void setLookupHandler(Handler< CONTEXT_T, KEY_T, RESULT_T > lookupHandler) {
+ this.lookupHandler = lookupHandler;
+ }
+
+ @Override
+ public boolean exists(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOException {
+ if(logAccess) {
+ accessTracker.logAccess(key);
+ }
+ return lookupHandler.exists(key, context, logAccess);
+ }
+
+ @Override
+ public RESULT_T get(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOException {
+ if(logAccess) {
+ accessTracker.logAccess(key);
+ }
+ return lookupHandler.get(key, context, logAccess);
+ }
+
+ @Override
+ public void close() throws Exception {
+ accessTracker.cleanup();
+ lookupHandler.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
new file mode 100644
index 0000000..3a227fd
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
@@ -0,0 +1,8 @@
+package org.apache.metron.reference.lookup;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public interface LookupKey {
+ byte[] toBytes();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTracker.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTracker.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTracker.java
new file mode 100644
index 0000000..0b0846e
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTracker.java
@@ -0,0 +1,23 @@
+package org.apache.metron.reference.lookup.accesstracker;
+
+import org.apache.metron.reference.lookup.LookupKey;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public interface AccessTracker extends Serializable{
+ void logAccess(LookupKey key);
+ void configure(Map<String, Object> config);
+ boolean hasSeen(LookupKey key);
+ String getName();
+ AccessTracker union(AccessTracker tracker);
+ void reset();
+ boolean isFull();
+ void cleanup() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTrackerUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTrackerUtil.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTrackerUtil.java
new file mode 100644
index 0000000..18df030
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/AccessTrackerUtil.java
@@ -0,0 +1,69 @@
+package org.apache.metron.reference.lookup.accesstracker;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import javax.annotation.Nullable;
+import java.io.*;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public enum AccessTrackerUtil {
+ INSTANCE;
+
+ public static byte[] COLUMN = Bytes.toBytes("v");
+
+ public AccessTracker deserializeTracker(byte[] bytes) throws IOException, ClassNotFoundException {
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
+ return (AccessTracker) ois.readObject();
+ }
+ public byte[] serializeTracker(AccessTracker tracker) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(tracker);
+ oos.flush();
+ oos.close();
+ return bos.toByteArray();
+ }
+
+
+ public void persistTracker(HTableInterface accessTrackerTable, String columnFamily, PersistentAccessTracker.AccessTrackerKey key, AccessTracker underlyingTracker) throws IOException {
+ Put put = new Put(key.toRowKey());
+ put.add(Bytes.toBytes(columnFamily), COLUMN, serializeTracker(underlyingTracker));
+ accessTrackerTable.put(put);
+ }
+
+ public Iterable<AccessTracker> loadAll(HTableInterface accessTrackerTable, final String columnFamily, final String name, final long earliest) throws IOException {
+ Scan scan = new Scan(PersistentAccessTracker.AccessTrackerKey.getTimestampScanKey(name, earliest));
+ ResultScanner scanner = accessTrackerTable.getScanner(scan);
+ return Iterables.transform(scanner, new Function<Result, AccessTracker>() {
+
+ @Nullable
+ @Override
+ public AccessTracker apply(@Nullable Result result) {
+ try {
+ return deserializeTracker(result.getValue(Bytes.toBytes(columnFamily), COLUMN));
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to deserialize " + name + " @ " + earliest);
+ }
+ }
+ });
+ }
+
+
+ public AccessTracker loadAll(Iterable<AccessTracker> trackers) throws IOException, ClassNotFoundException {
+ AccessTracker tracker = null;
+ for(AccessTracker t : trackers) {
+ if(tracker == null) {
+ tracker = t;
+ }
+ else {
+ tracker = tracker.union(t);
+ }
+ }
+ return tracker;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/BloomAccessTracker.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/BloomAccessTracker.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/BloomAccessTracker.java
new file mode 100644
index 0000000..d8edef5
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/BloomAccessTracker.java
@@ -0,0 +1,132 @@
+package org.apache.metron.reference.lookup.accesstracker;
+
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.PrimitiveSink;
+import org.apache.metron.reference.lookup.LookupKey;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class BloomAccessTracker implements AccessTracker {
+ private static final long serialVersionUID = 1L;
+ public static final String EXPECTED_INSERTIONS_KEY = "expected_insertions";
+ public static final String FALSE_POSITIVE_RATE_KEY = "false_positive_rate";
+ public static final String NAME_KEY = "name";
+
+ private static class LookupKeyFunnel implements Funnel<LookupKey> {
+ @Override
+ public void funnel(LookupKey lookupKey, PrimitiveSink primitiveSink) {
+ primitiveSink.putBytes(lookupKey.toBytes());
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ return this.getClass().equals(obj.getClass());
+ }
+ }
+
+ private static Funnel<LookupKey> LOOKUPKEY_FUNNEL = new LookupKeyFunnel();
+
+ BloomFilter<LookupKey> filter;
+ String name;
+ int expectedInsertions;
+ double falsePositiveRate;
+ int numInsertions = 0;
+
+ public BloomAccessTracker(String name, int expectedInsertions, double falsePositiveRate) {
+ this.name = name;
+ this.expectedInsertions = expectedInsertions;
+ this.falsePositiveRate = falsePositiveRate;
+ filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
+ }
+ public BloomAccessTracker() {}
+ public BloomAccessTracker(Map<String, Object> config) {
+ configure(config);
+ }
+
+ protected BloomFilter<LookupKey> getFilter() {
+ return filter;
+ }
+ @Override
+ public void logAccess(LookupKey key) {
+ numInsertions++;
+ filter.put(key);
+ }
+
+ @Override
+ public void configure(Map<String, Object> config) {
+ expectedInsertions = toInt(config.get(EXPECTED_INSERTIONS_KEY));
+ falsePositiveRate = toDouble(config.get(FALSE_POSITIVE_RATE_KEY));
+ name = config.get(NAME_KEY).toString();
+ filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
+ }
+
+ @Override
+ public boolean hasSeen(LookupKey key) {
+ return filter.mightContain(key);
+ }
+
+ @Override
+ public void reset() {
+ filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
+ }
+
+ private static double toDouble(Object o) {
+ if(o instanceof String) {
+ return Double.parseDouble((String)o);
+ }
+ else if(o instanceof Number) {
+ return ((Number) o).doubleValue();
+ }
+ else {
+ throw new IllegalStateException("Unable to convert " + o + " to a double.");
+ }
+ }
+ private static int toInt(Object o) {
+ if(o instanceof String) {
+ return Integer.parseInt((String)o);
+ }
+ else if(o instanceof Number) {
+ return ((Number) o).intValue();
+ }
+ else {
+ throw new IllegalStateException("Unable to convert " + o + " to a double.");
+ }
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+
+ @Override
+ public AccessTracker union(AccessTracker tracker) {
+ if(filter == null) {
+ throw new IllegalStateException("Unable to union access tracker, because this tracker is not initialized.");
+ }
+ if(tracker instanceof BloomAccessTracker ) {
+ filter.putAll(((BloomAccessTracker)tracker).getFilter());
+ return this;
+ }
+ else {
+ throw new IllegalStateException("Unable to union access tracker, because it's not of the right type (BloomAccessTracker)");
+ }
+ }
+
+ @Override
+ public boolean isFull() {
+ return numInsertions >= expectedInsertions;
+ }
+
+ @Override
+ public void cleanup() throws IOException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/PersistentAccessTracker.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/PersistentAccessTracker.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/PersistentAccessTracker.java
new file mode 100644
index 0000000..9e239a8
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/accesstracker/PersistentAccessTracker.java
@@ -0,0 +1,192 @@
+package org.apache.metron.reference.lookup.accesstracker;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.log4j.Logger;
+import org.apache.metron.reference.lookup.LookupKey;
+
+import java.io.*;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class PersistentAccessTracker implements AccessTracker {
+ private static final Logger LOG = Logger.getLogger(PersistentAccessTracker.class);
+ private static final long serialVersionUID = 1L;
+
+ public static class AccessTrackerKey {
+ String name;
+ String containerName;
+ long timestamp;
+ public AccessTrackerKey(String name, String containerName, long timestamp) {
+ this.name = name;
+ this.containerName = containerName;
+ this.timestamp = timestamp;
+ }
+
+ public byte[] toRowKey() {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(os);
+ try {
+ dos.writeUTF(name);
+ dos.writeLong(timestamp);
+ dos.writeUTF(containerName);
+ dos.flush();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to write rowkey: " + this, e);
+ }
+
+ return os.toByteArray();
+ }
+
+ public static byte[] getTimestampScanKey(String name, long timestamp) {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(os);
+ try {
+ dos.writeUTF(name);
+ dos.writeLong(timestamp);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create scan key " , e);
+ }
+
+ return os.toByteArray();
+ }
+
+ public static AccessTrackerKey fromRowKey(byte[] rowKey) {
+ ByteArrayInputStream is = new ByteArrayInputStream(rowKey);
+ DataInputStream dis = new DataInputStream(is);
+ try {
+ String name = dis.readUTF();
+ long timestamp = dis.readLong();
+ String containerName = dis.readUTF();
+ return new AccessTrackerKey(name, containerName, timestamp);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to read rowkey: ", e);
+ }
+ }
+ }
+
+ private static class Persister extends TimerTask {
+ PersistentAccessTracker tracker;
+ public Persister(PersistentAccessTracker tracker) {
+ this.tracker = tracker;
+ }
+ /**
+ * The action to be performed by this timer task.
+ */
+ @Override
+ public void run() {
+ tracker.persist(false);
+ }
+ }
+
+ Object sync = new Object();
+ HTableInterface accessTrackerTable;
+ String accessTrackerColumnFamily;
+ AccessTracker underlyingTracker;
+ long timestamp = System.currentTimeMillis();
+ String name;
+ String containerName;
+ private Timer timer;
+ long maxMillisecondsBetweenPersists;
+
+ public PersistentAccessTracker( String name
+ , String containerName
+ , HTableInterface accessTrackerTable
+ , String columnFamily
+ , AccessTracker underlyingTracker
+ , long maxMillisecondsBetweenPersists
+ )
+ {
+ this.containerName = containerName;
+ this.accessTrackerTable = accessTrackerTable;
+ this.name = name;
+ this.accessTrackerColumnFamily = columnFamily;
+ this.underlyingTracker = underlyingTracker;
+ this.maxMillisecondsBetweenPersists = maxMillisecondsBetweenPersists;
+ timer = new Timer();
+ if(maxMillisecondsBetweenPersists > 0) {
+ timer.scheduleAtFixedRate(new Persister(this), maxMillisecondsBetweenPersists, maxMillisecondsBetweenPersists);
+ }
+ }
+
+ public void persist(boolean force) {
+ synchronized(sync) {
+ if(force || (System.currentTimeMillis() - timestamp) >= maxMillisecondsBetweenPersists) {
+ //persist
+ try {
+ AccessTrackerUtil.INSTANCE.persistTracker(accessTrackerTable, accessTrackerColumnFamily, new AccessTrackerKey(name, containerName, timestamp), underlyingTracker);
+ timestamp = System.currentTimeMillis();
+ reset();
+ } catch (IOException e) {
+ LOG.error("Unable to persist access tracker.", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void logAccess(LookupKey key) {
+ synchronized (sync) {
+ underlyingTracker.logAccess(key);
+ if (isFull()) {
+ persist(true);
+ }
+ }
+ }
+
+ @Override
+ public void configure(Map<String, Object> config) {
+ underlyingTracker.configure(config);
+ }
+
+ @Override
+ public boolean hasSeen(LookupKey key) {
+ synchronized(sync) {
+ return underlyingTracker.hasSeen(key);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return underlyingTracker.getName();
+ }
+
+ @Override
+ public AccessTracker union(AccessTracker tracker) {
+ PersistentAccessTracker t1 = (PersistentAccessTracker)tracker;
+ underlyingTracker = underlyingTracker.union(t1.underlyingTracker);
+ return this;
+ }
+
+ @Override
+ public void reset() {
+ synchronized(sync) {
+ underlyingTracker.reset();
+ }
+ }
+
+ @Override
+ public boolean isFull() {
+ synchronized (sync) {
+ return underlyingTracker.isFull();
+ }
+ }
+
+ @Override
+ public void cleanup() throws IOException {
+ synchronized(sync) {
+ try {
+ persist(true);
+ }
+ catch(Throwable t) {
+ LOG.error("Unable to persist underlying tracker", t);
+ }
+ underlyingTracker.cleanup();
+ accessTrackerTable.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/handler/Handler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/handler/Handler.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/handler/Handler.java
new file mode 100644
index 0000000..096e206
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/handler/Handler.java
@@ -0,0 +1,13 @@
+package org.apache.metron.reference.lookup.handler;
+
+import org.apache.metron.reference.lookup.LookupKey;
+
+import java.io.IOException;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public interface Handler<CONTEXT_T, KEY_T extends LookupKey, RESULT_T> extends AutoCloseable{
+ boolean exists(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOException;
+ RESULT_T get(KEY_T key, CONTEXT_T context, boolean logAccess) throws IOException;
+}