You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2016/04/26 16:46:22 UTC
[34/51] [partial] incubator-metron git commit: METRON-113 Project
Reorganization (merrimanr) closes apache/incubator-metron#88
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/README.md b/metron-platform/metron-data-management/README.md
new file mode 100644
index 0000000..a123cc3
--- /dev/null
+++ b/metron-platform/metron-data-management/README.md
@@ -0,0 +1,252 @@
+# metron-data-management
+
+This project is a collection of classes to assist with loading of
+various enrichment and threat intelligence sources into Metron.
+
+## Simple HBase Enrichments/Threat Intelligence
+
+The vast majority of enrichments and threat intelligence processing tend
+toward the following pattern:
+* Take a field
+* Look up the field in a key/value store
+* If the key exists, then either it's a threat to be alerted or it should be enriched with the value associated with the key.
+
+As such, we have created this capability as a default threat intel and enrichment adapter. The basic primitive for simple enrichments and threat intelligence sources
+is a complex key containing the following:
+* Type : The type of threat intel or enrichment (e.g. malicious_ip)
+* Indicator : The indicator in question
+* Value : The value to associate with the type, indicator pair. This is a JSON map.
+
+At present, all of the dataloads utilities function by converting raw data
+sources to this primitive key (type, indicator) and value to be placed in HBase.
+
+In the case of threat intel, a hit on the threat intel table will result
+in:
+* The `is_alert` field being set to `true` in the index
+* A field named `threatintels.hbaseThreatIntel.$field.$threatintel_type` is set to `alert`
+ * `$field` is the field in the original document that was a match (e.g. `src_ip_addr`)
+ * `$threatintel_type` is the type of threat intel imported (defined in the Extractor configuration below).
+
+In the case of simple hbase enrichment, a hit on the enrichments table
+will result in the following new field for each key in the value:`enrichments.hbaseEnrichment.$field.$enrichment_type.$key`
+* `$field` is the field in the original document that was a match (e.g. `src_ip_addr`)
+* `$enrichment_type` is the type of enrichment imported (defined in the Extractor configuration below).
+* `$key` is a key in the JSON map associated with the row in HBase.
+
+For instance, in the situation where we had the following very silly key/value in
+HBase in the enrichment table:
+* indicator: `127.0.0.1`
+* type : `important_addresses`
+* value: `{ "name" : "localhost", "location" : "home" }`
+
+If we had a document whose `ip_src_addr` came through with a value of
+`127.0.0.1`, we would have the following fields added to the indexed
+document:
+* `enrichments.hbaseEnrichment.ip_src_addr.important_addresses.name` : `localhost`
+* `enrichments.hbaseEnrichment.ip_src_addr.important_addresses.location` : `home`
+
+## Extractor Framework
+
+For the purpose of ingesting data of a variety of formats, we have
+created an Extractor framework which allows for common data formats to
+be interpreted as enrichment or threat intelligence sources. The
+formats supported at present are:
+* CSV (both threat intel and enrichment)
+* STIX (threat intel only)
+* Custom (pass your own class)
+
+All of the current utilities take a JSON file to configure how to
+interpret input data. This JSON describes the type of data and the
+schema if necessary for the data if it is not fixed (as in STIX, e.g.).
+
+### CSV Extractor
+
+Consider the following example configuration file which
+describes how to process a CSV file.
+
+````
+{
+ "config" : {
+ "columns" : {
+ "ip" : 0
+ ,"source" : 2
+ }
+ ,"indicator_column" : "ip"
+ ,"type" : "malicious_ip"
+ ,"separator" : ","
+ }
+ ,"extractor" : "CSV"
+}
+````
+
+In this example, we have instructed the extractor of the schema (i.e. the columns field),
+two columns at the first and third position. We have indicated that the `ip` column is the indicator type
+and that the enrichment type is named `malicious_ip`. We have also indicated that the extractor to use is the CSV Extractor.
+The other option is the STIX extractor or a fully qualified classname for your own extractor.
+
+The meta column values will show up in the value in HBase because it is called out as a non-indicator column. The key
+for the value will be 'meta'. For instance, given an input string of `123.45.123.12,something,the grapevine`, the following key, value
+would be extracted:
+* Indicator : `123.45.123.12`
+* Type : `malicious_ip`
+* Value : `{ "source" : "the grapevine" }`
+
+### STIX Extractor
+
+Consider the following config for importing STIX documents. This is a threat intelligence interchange
+format, so it is particularly relevant and attractive data to import for our purposes. Because STIX is
+a standard format, there is no need to specify the schema or how to interpret the documents.
+
+We support a subset of STIX messages for importation:
+
+| STIX Type | Specific Type | Enrichment Type Name |
+|-----------|---------------|----------------------|
+| Address | IPV_4_ADDR | address:IPV_4_ADDR |
+| Address | IPV_6_ADDR | address:IPV_6_ADDR |
+| Address | E_MAIL | address:E_MAIL |
+| Address | MAC | address:MAC |
+| Domain | FQDN | domain:FQDN |
+| Hostname | | hostname |
+
+
+NOTE: The enrichment type will be used as the type above.
+
+Consider the following configuration for an Extractor
+
+````
+{
+ "config" : {
+ "stix_address_categories" : "IPV_4_ADDR"
+ }
+ ,"extractor" : "STIX"
+}
+````
+
+In here, we're configuring the STIX extractor to load from a series of STIX files, however we only want to bring in IPv4
+addresses from the set of all possible addresses. Note that if no categories are specified for import, all are assumed.
+Also, only address and domain types allow filtering via `stix_address_categories` and `stix_domain_categories` config
+parameters.
+
+## Enrichment Config
+
+In order to automatically add new enrichment and threat intel types to existing, running enrichment topologies, you will
+need to add new fields and new types to the zookeeper configuration. A convenience parameter has been made to assist in this
+when doing an import. Namely, you can specify the enrichment configs and how they associate with the fields of the
+documents flowing through the enrichment topology.
+
+Consider the following Enrichment Configuration JSON. This one is for a threat intelligence type:
+
+````
+{
+ "zkQuorum" : "localhost:2181"
+ ,"sensorToFieldList" : {
+ "bro" : {
+ "type" : "THREAT_INTEL"
+ ,"fieldToEnrichmentTypes" : {
+ "ip_src_addr" : [ "malicious_ip" ]
+ ,"ip_dst_addr" : [ "malicious_ip" ]
+ }
+ }
+ }
+}
+````
+
+We have to specify the following:
+* The zookeeper quorum which holds the cluster configuration
+* The mapping between the fields in the enriched documents and the enrichment types.
+
+This configuration allows the ingestion tools to update zookeeper post-ingestion so that the enrichment topology can take advantage
+immediately of the new type.
+
+
+## Loading Utilities
+
+The two configurations above are used in the three separate ingestion tools:
+* Taxii Loader
+* Bulk load from HDFS via MapReduce
+* Flat File ingestion
+
+### Taxii Loader
+
+The shell script `$METRON_HOME/bin/threatintel_taxii_load.sh` can be used to poll a Taxii server for STIX documents and ingest them into HBase.
+It is quite common for this Taxii server to be an aggregation server such as Soltra Edge.
+
+In addition to the Enrichment and Extractor configs described above, this loader requires a configuration file describing the connection information
+to the Taxii server. An illustrative example of such a configuration file is:
+
+````
+{
+ "endpoint" : "http://localhost:8282/taxii-discovery-service"
+ ,"type" : "DISCOVER"
+ ,"collection" : "guest.Abuse_ch"
+ ,"table" : "threat_intel"
+ ,"columnFamily" : "cf"
+ ,"allowedIndicatorTypes" : [ "domainname:FQDN", "address:IPV_4_ADDR" ]
+}
+````
+
+As you can see, we are specifying the following information:
+* endpoint : The URL of the endpoint
+* type : `POLL` or `DISCOVER` depending on the endpoint.
+* collection : The Taxii collection to ingest
+* table : The HBase table to import into
+* columnFamily : The column family to import into
+* allowedIndicatorTypes : an array of acceptable threat intel types (see the "Enrichment Type Name" column of the Stix table above for the possibilities).
+
+The parameters for the utility are as follows:
+
+| Short Code | Long Code | Is Required? | Description |
+|------------|---------------------------|--------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
+| -h | | No | Generate the help screen/set of options |
+| -e | --extractor_config | Yes | JSON Document describing the extractor for this input data source |
+| -c | --taxii_connection_config | Yes | The JSON config file to configure the connection |
+| -p | --time_between_polls | No | The time between polling the Taxii server in milliseconds. (default: 1 hour) |
+| -b | --begin_time | No | Start time to poll the Taxii server (all data from that point will be gathered in the first pull). The format for the date is yyyy-MM-dd HH:mm:ss |
+| -l | --log4j | No | The Log4j Properties to load |
+| -n | --enrichment_config | No | The JSON document describing the enrichments to configure. Unlike other loaders, this is run first if specified. |
+
+
+### Bulk Load from HDFS
+
+The shell script `$METRON_HOME/bin/threatintel_bulk_load.sh` will kick off a MR job to load data staged in HDFS into an HBase table. Note: despite what
+the naming may suggest, this utility works for enrichment as well as threat intel due to the underlying infrastructure being the same.
+
+The parameters for the utility are as follows:
+
+| Short Code | Long Code | Is Required? | Description |
+|------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------|
+| -h | | No | Generate the help screen/set of options |
+| -e | --extractor_config | Yes | JSON Document describing the extractor for this input data source |
+| -t | --table | Yes | The HBase table to import into |
+| -f | --column_family | Yes | The HBase table column family to import into |
+| -i | --input | Yes | The input data location on HDFS |
+| -n | --enrichment_config | No | The JSON document describing the enrichments to configure. Unlike other loaders, this is run first if specified. |
+or threat intel.
+
+### Flatfile Loader
+
+The shell script `$METRON_HOME/bin/flatfile_loader.sh` will read data from local disk and load the enrichment or threat intel data into an HBase table.
+Note: This utility works for enrichment as well as threat intel due to the underlying infrastructure being the same.
+
+One special thing to note here is that there is a special configuration
+parameter to the Extractor config that is only considered during this
+loader:
+* inputFormatHandler : This specifies how to consider the data. The two implementations are `BY_LINE` and `org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat`.
+
+The default is `BY_LINE`, which makes sense for a list of CSVs where
+each line indicates a unit of information which can be imported.
+However, if you are importing a set of STIX documents, then you want
+each document to be considered as input to the Extractor.
+
+The parameters for the utility are as follows:
+
+| Short Code | Long Code | Is Required? | Description |
+|------------|---------------------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| -h | | No | Generate the help screen/set of options |
+| -e | --extractor_config | Yes | JSON Document describing the extractor for this input data source |
+| -t | --hbase_table | Yes | The HBase table to import into |
+| -c | --hbase_cf | Yes | The HBase table column family to import into |
+| -i | --input | Yes | The input data location on local disk. If this is a file, then that file will be loaded. If this is a directory, then the files will be loaded recursively under that directory. |
+| -l | --log4j | No | The log4j properties file to load |
+| -n | --enrichment_config | No | The JSON document describing the enrichments to configure. Unlike other loaders, this is run first if specified. |
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/pom.xml b/metron-platform/metron-data-management/pom.xml
new file mode 100644
index 0000000..6c3f866
--- /dev/null
+++ b/metron-platform/metron-data-management/pom.xml
@@ -0,0 +1,327 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-platform</artifactId>
+ <version>0.1BETA</version>
+ </parent>
+ <artifactId>metron-data-management</artifactId>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <httpcore.version>4.3.2</httpcore.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_hbase_guava_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <version>2.2.11</version>
+ </dependency>
+ <dependency>
+ <groupId>net.sf.saxon</groupId>
+ <artifactId>Saxon-HE</artifactId>
+ <version>9.5.1-5</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.xml.bind</groupId>
+ <artifactId>jaxb-impl</artifactId>
+ <version>2.2.5-2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mitre</groupId>
+ <artifactId>stix</artifactId>
+ <version>1.2.0.2</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-common</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.googlecode.disruptor</groupId>
+ <artifactId>disruptor</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-enrichment</artifactId>
+ <version>0.1BETA</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-hbase</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mitre.taxii</groupId>
+ <artifactId>taxii</artifactId>
+ <version>1.1.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ <version>${global_opencsv_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>${httpcore.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpcore.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${global_elasticsearch_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.carrotsearch.randomizedtesting</groupId>
+ <artifactId>randomizedtesting-runner</artifactId>
+ <version>2.1.14</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${global_elasticsearch_version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-test-framework</artifactId>
+ <version>4.10.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-core</artifactId>
+ <version>4.10.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ <version>1.19</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-integration-test</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-easymock</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>3.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>src</directory>
+ <excludes>
+ <exclude>**/*.java</exclude>
+ </excludes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <argLine>-Xmx2048m -XX:MaxPermSize=256m -XX:-UseSplitVerifier</argLine>
+ <skip>true</skip>
+ <trimStackTrace>false</trimStackTrace>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.metron.guava.dataload</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>org.apache.metron.httpcore.dataload</shadedPattern>
+ </relocation>
+ </relocations>
+ <artifactSet>
+ <excludes>
+ <exclude>classworlds:classworlds</exclude>
+ <exclude>junit:junit</exclude>
+ <exclude>jmock:*</exclude>
+ <exclude>*:xml-apis</exclude>
+ <exclude>*slf4j*</exclude>
+ <exclude>org.apache.maven:lib:tests</exclude>
+ <exclude>log4j:log4j:jar:</exclude>
+ <exclude>*:hbase:*</exclude>
+ <exclude>org.apache.hadoop.yarn.util.package-info*</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptor>src/main/assembly/assembly.xml</descriptor>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/assembly/assembly.xml b/metron-platform/metron-data-management/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..c2c384b
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/assembly/assembly.xml
@@ -0,0 +1,42 @@
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<assembly>
+ <id>archive</id>
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.basedir}/src/main/bash</directory>
+ <outputDirectory>/bin</outputDirectory>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ <excludes>
+ <exclude>**/*.formatted</exclude>
+ <exclude>**/*.filtered</exclude>
+ </excludes>
+ <fileMode>0755</fileMode>
+ <lineEnding>unix</lineEnding>
+ <filtered>true</filtered>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/target</directory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ <outputDirectory>/lib</outputDirectory>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ </fileSet>
+ </fileSets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/Whois_CSV_to_JSON.py
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/Whois_CSV_to_JSON.py b/metron-platform/metron-data-management/src/main/bash/Whois_CSV_to_JSON.py
new file mode 100755
index 0000000..2091418
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/Whois_CSV_to_JSON.py
@@ -0,0 +1,208 @@
+#!/usr/bin/python
+
+"""
+Copyright 2014 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import sys
+import os
+import csv
+import json
+import multiprocessing
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+
+def is_field_excluded(fieldname=None):
+ """
+ Checks to see if a field name is a member of a list of names to exclude. Modify to suit your own list.
+
+ :param fieldname: A string representing a field name
+ :return: True or False
+ """
+ import re
+
+ # List of fields names to exclude
+ excluded_fields = [
+ 'Audit_auditUpdatedDate',
+ #'domainName'
+ ]
+
+ if fieldname in excluded_fields:
+ return True
+
+ # Regexes to match for exclusion
+ excluded_regexes = [
+ ['_rawText$', re.IGNORECASE],
+ ]
+
+ for regex in excluded_regexes:
+ if re.search(regex[0], fieldname, regex[1]):
+ return True
+
+ return False
+
+
+def process_csv(in_filename, out_filename):
+ """
+ Processes a CSV file of WHOIS data and converts each line to a JSON element, skipping specific fields that
+ are not deemed necessary (domainName, *_rawText, Audit_auditUpdatedDate)
+
+ :param in_filename: Input CSV filename with full path
+ :param out_filename: Output JSON filename with full path
+ :return: None
+ """
+ if out_filename:
+ out_fh = open(out_filename, 'wb')
+ logging.debug('%s: Converting %s to %s' % (multiprocessing.current_process().name, in_filename, out_filename))
+ else:
+ logging.debug('%s: Analyzing %s' % (multiprocessing.current_process().name, in_filename))
+
+ with open(in_filename, 'rb') as f:
+ reader = csv.DictReader(f, delimiter=',', quotechar='"')
+ line_num = 0
+ try:
+ for row in reader:
+ line_num += 1
+ try:
+ if out_filename:
+ # json conversion and output
+ new_row = {}
+ for field in reader.fieldnames:
+ # fields we don't want include these + anything with rawText
+ #if field not in ['Audit_auditUpdatedDate', 'domainName'] and not field.endswith('_rawText'):
+ if not is_field_excluded(field):
+ new_row[field] = row.get(field)
+ json.dump(new_row, out_fh)
+ out_fh.write('\n')
+ else:
+ # analysis .. check to be sure fileheader and csv row counts match
+ if len(row) != len(reader.fieldnames):
+ raise Exception('Field count mismatch: row: %s / fields: %s' % (len(row), len(reader.fieldnames)))
+ except Exception, e:
+ logging.warn("Error with file %s, line %s: %s" % (in_filename, line_num, e))
+
+ if not out_filename:
+ logging.info('Analyzed %s: OK' % in_filename)
+ except Exception, e:
+ logging.warn(e)
+
+ out_fh.close()
+
+
+##-------------------------------------------------------------------------
+
+def process_files(source_dir, output_dir, max_processes=10, overwrite=False):
+ """
+ Generates a multiprocessing.Pool() queue with a list of input and output files to be processed with processCSV.
+ Files are added by walking the source_dir and adding any file with a CSV extension. Output is placed into a single
+ directory for processing. Output filenames are generated using the first part of the directory name so a file
+ named source_dir/com/1.csv would become outputDir/com_1.json
+
+ :param source_dir: Source directory of CSV files
+ :param output_dir: Output directory for resultant JSON files
+ :param max_processes: Maximum number of processes run
+ :return:
+ """
+ logging.info("Processing Whois files from %s" % source_dir)
+
+ if output_dir and not os.path.exists(output_dir):
+ logging.debug("Creating output directory %s" % output_dir)
+ os.makedirs(output_dir)
+
+ logging.info("Starting %s pool workers" % max_processes)
+
+ if sys.version.startswith('2.6'):
+ # no maxtaskperchild in 2.6
+ pool = multiprocessing.Pool(processes=max_processes)
+ else:
+ pool = multiprocessing.Pool(processes=max_processes, maxtasksperchild=4)
+
+ filecount = 0
+ for dirname, dirnames, filenames in os.walk(source_dir):
+ for filename in filenames:
+ if filename.endswith('.csv'):
+ # output files go to outputDir and are named using the last subdirectory from the dirname
+ if output_dir:
+ out_filename = filename.replace('csv', 'json')
+ out_filename = os.path.join(output_dir, '%s_%s' % (os.path.split(dirname)[-1], out_filename))
+
+ # if file does not exist or if overwrite is true, add file process to the pool
+ if not os.path.isfile(out_filename) or overwrite:
+ pool.apply_async(process_csv, args=(os.path.join(dirname, filename), out_filename))
+ filecount += 1
+ else:
+ logging.info("Skipping %s, %s exists and overwrite is false" % (filename, out_filename))
+ else:
+ # no outputdir so we just analyze the files
+ pool.apply_async(process_csv, args=(os.path.join(dirname, filename), None))
+ filecount += 1
+
+ try:
+ pool.close()
+ logging.info("Starting activities on %s CSV files" % filecount)
+ pool.join()
+ except KeyboardInterrupt:
+ logging.info("Aborting")
+ pool.terminate()
+
+ logging.info("Completed")
+
+
+##-------------------------------------------------------------------------
+
+if __name__ == "__main__":
+
+ max_cpu = multiprocessing.cpu_count()
+
+ from optparse import OptionParser
+ parser = OptionParser()
+ parser.add_option('-s', '--source', dest='source_dir', action='store',
+ help='Source directory to walk for CSV files')
+ parser.add_option('-o', '--output', dest='out_dir', action='store',
+ help='Output directory for JSON files')
+ parser.add_option('-O', '--overwrite', dest='overwrite', action='store_true',
+ help='Overwrite existing files in output directory')
+ parser.add_option('-p', '--processes', dest='max_processes', action='store', default=max_cpu, type='int',
+ help='Max number of processes to spawn')
+ parser.add_option('-a', '--analyze', dest='analyze', action='store_true',
+ help='Analyze CSV files for validity, no file output')
+ parser.add_option('-d', '--debug', dest='debug', action='store_true',
+ help='Enable debug messages')
+
+ (options, args) = parser.parse_args()
+
+ if not options.source_dir:
+ logging.error("Source directory required")
+ sys.exit(-1)
+
+ if not options.out_dir or options.analyze:
+ out_dir = None
+ elif not options.out_dir:
+ logging.error("Ouput directory or analysis option required")
+ sys.exit(-1)
+ else:
+ out_dir = options.out_dir
+
+ if options.max_processes > max_cpu:
+ logging.warn('Max Processes (%s) is greater than available Processors (%s)' % (options.max_processes, max_cpu))
+
+ if options.debug:
+ # enable debug level and multiprocessing debugging
+ logging.basicConfig(level=logging.DEBUG)
+ multiprocessing.log_to_stderr(logging.DEBUG)
+
+ process_files(options.source_dir, options.out_dir, options.max_processes, options.overwrite)
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/flatfile_loader.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/flatfile_loader.sh b/metron-platform/metron-data-management/src/main/bash/flatfile_loader.sh
new file mode 100755
index 0000000..e464984
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/flatfile_loader.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+ . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+ . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+CP=/usr/metron/0.1BETA/lib/metron-data-management-0.1BETA.jar:/usr/metron/0.1BETA/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath`
+HADOOP_CLASSPATH=$(echo $CP )
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+ if [ -f $jar ];then
+ LIBJARS="$jar,$LIBJARS"
+ fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.1BETA/lib/metron-data-management-0.1BETA.jar org.apache.metron.dataloads.nonbulk.flatfile.SimpleEnrichmentFlatFileLoader "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/prune_elasticsearch_indices.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/prune_elasticsearch_indices.sh b/metron-platform/metron-data-management/src/main/bash/prune_elasticsearch_indices.sh
new file mode 100644
index 0000000..aed6782
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/prune_elasticsearch_indices.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+yarn jar /usr/metron/${project.version}/lib/metron-data-management-${project.version}.jar org.apache.metron.dataloads.bulk.ElasticsearchDataPrunerRunner "$@"
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/prune_hdfs_files.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/prune_hdfs_files.sh b/metron-platform/metron-data-management/src/main/bash/prune_hdfs_files.sh
new file mode 100644
index 0000000..b37e022
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/prune_hdfs_files.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+yarn jar /usr/metron/${project.version}/lib/metron-data-management-${project.version}.jar org.apache.metron.dataloads.bulk.HDFSDataPruner "$@"
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_load.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_load.sh b/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_load.sh
new file mode 100755
index 0000000..2df4ee3
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_load.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+ . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+ . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+ if [ -f $jar ];then
+ LIBJARS="$jar,$LIBJARS"
+ fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.1BETA/lib/metron-data-management-0.1BETA.jar org.apache.metron.dataloads.bulk.ThreatIntelBulkLoader -libjars ${LIBJARS} "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_prune.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_prune.sh b/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_prune.sh
new file mode 100755
index 0000000..c3ec233
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_prune.sh
@@ -0,0 +1,37 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+ . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+ . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+ if [ -f $jar ];then
+ LIBJARS="$jar,$LIBJARS"
+ fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.1BETA/lib/metron-data-management-0.1BETA.jar org.apache.metron.dataloads.bulk.LeastRecentlyUsedPruner -libjars ${LIBJARS} "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/threatintel_taxii_load.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/threatintel_taxii_load.sh b/metron-platform/metron-data-management/src/main/bash/threatintel_taxii_load.sh
new file mode 100755
index 0000000..321041a
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/threatintel_taxii_load.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+ . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+ . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+CP=/usr/metron/0.1BETA/lib/metron-data-management-0.1BETA.jar:/usr/metron/0.1BETA/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath`
+HADOOP_CLASSPATH=$(echo $CP )
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+ if [ -f $jar ];then
+ LIBJARS="$jar,$LIBJARS"
+ fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.1BETA/lib/metron-data-management-0.1BETA.jar org.apache.metron.dataloads.nonbulk.taxii.TaxiiLoader "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java
new file mode 100644
index 0000000..98e3b52
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+public abstract class DataPruner {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DataPruner.class);
+ protected long firstTimeMillis;
+ protected long lastTimeMillis;
+ protected String wildCard;
+
+ public DataPruner(Date startDate, Integer numDays, String wildCard) throws StartDateException {
+
+ Date startAtMidnight = dateAtMidnight(startDate);
+ this.lastTimeMillis = startDate.getTime();
+ this.firstTimeMillis = lastTimeMillis - TimeUnit.DAYS.toMillis(numDays);
+ this.wildCard = wildCard;
+
+ Date today = dateAtMidnight(new Date());
+
+ if (!today.after(startAtMidnight)) {
+ throw new StartDateException("Prune Start Date must be prior to today");
+ }
+ }
+
+ protected Date dateAtMidnight(Date date) {
+
+ Calendar calendar = Calendar.getInstance();
+
+ calendar.setTime(date);
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ return calendar.getTime();
+
+ }
+
+
+ public abstract Long prune() throws IOException;
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java
new file mode 100644
index 0000000..ddbb61b
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.metron.common.configuration.Configuration;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Iterator;
+
+public class ElasticsearchDataPruner extends DataPruner {
+
+ private String indexPattern;
+ private SimpleDateFormat dateFormat;
+ protected Client indexClient = null;
+ protected Configuration configuration;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDataPruner.class);
+ private static final String defaultDateFormat = "yyyy.MM.dd.HH";
+
+
+
+ private Predicate<String> filterWithRegex = new Predicate<String>() {
+
+ @Override
+ public boolean apply(String str) {
+
+ try {
+ String dateString = str.substring(indexPattern.length());
+ Date indexCreateDate = dateFormat.parse(dateString);
+ long indexCreatedDate = indexCreateDate.getTime();
+ if (indexCreatedDate >= firstTimeMillis && indexCreatedDate < lastTimeMillis) {
+ return true;
+ }
+ } catch (ParseException e) {
+ LOG.error("Unable to parse date from + " + str.substring(indexPattern.length()), e);
+ }
+
+ return false;
+ }
+
+ };
+
+ public ElasticsearchDataPruner(Date startDate, Integer numDays,Configuration configuration, Client indexClient, String indexPattern) throws Exception {
+
+ super(startDate, numDays, indexPattern);
+
+ this.indexPattern = indexPattern;
+ this.dateFormat = new SimpleDateFormat(defaultDateFormat);
+ this.configuration = configuration;
+ this.indexClient = indexClient;
+
+
+ }
+
+ @Override
+ public Long prune() throws IOException {
+
+ try {
+
+ configuration.update();
+
+ }
+ catch(Exception e) {
+
+ LOG.error("Unable to update configs",e);
+
+ }
+
+ String dateString = configuration.getGlobalConfig().get("es.date.format").toString();
+
+ if( null != dateString ){
+ dateFormat = new SimpleDateFormat(dateString);
+ }
+
+ ImmutableOpenMap<String, IndexMetaData> allIndices = indexClient.admin().cluster().prepareState().get().getState().getMetaData().getIndices();
+ Iterable indicesForDeletion = getFilteredIndices(allIndices);
+ Object[] indexArray = IteratorUtils.toArray(indicesForDeletion.iterator());
+
+ if(indexArray.length > 0) {
+ String[] indexStringArray = new String[indexArray.length];
+ System.arraycopy(indexArray, 0, indexStringArray, 0, indexArray.length);
+ deleteIndex(indexClient.admin(), indexStringArray);
+ }
+
+ return new Long(indexArray.length);
+
+ }
+
+ public Boolean deleteIndex(AdminClient adminClient, String... index) {
+
+ boolean isAcknowledged = adminClient.indices().delete(adminClient.indices().prepareDelete(index).request()).actionGet().isAcknowledged();
+ return new Boolean(isAcknowledged);
+
+ }
+
+ protected Iterable<String> getFilteredIndices(ImmutableOpenMap<String, IndexMetaData> indices) {
+
+ String[] returnedIndices = new String[indices.size()];
+ Iterator it = indices.keysIt();
+ System.arraycopy(IteratorUtils.toArray(it), 0, returnedIndices, 0, returnedIndices.length);
+ Iterable<String> matches = Iterables.filter(Arrays.asList(returnedIndices), filterWithRegex);
+
+ return matches;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
new file mode 100644
index 0000000..f0a4d3b
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.apache.commons.cli.*;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.common.configuration.Configuration;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+public class ElasticsearchDataPrunerRunner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDataPruner.class);
+
+ public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+
+ /**
+ * Example
+ * start=$(date -d '30 days ago' +%m/%d/%Y)
+ * yarn jar Metron-DataLoads-{VERSION}.jar org.apache.metron.dataloads.bulk.ElasticsearchDataPrunerRunner -i host1:9300 -p '/bro_index_' -s $(date -d '30 days ago' +%m/%d/%Y) -n 1;
+ * echo ${start}
+ **/
+
+ Options options = buildOptions();
+ Options help = new Options();
+ TransportClient client = null;
+
+ Option o = new Option("h", "help", false, "This screen");
+ o.setRequired(false);
+ help.addOption(o);
+
+
+
+ try {
+
+ CommandLine cmd = checkOptions(help,options, argv);
+
+ String start = cmd.getOptionValue("s");
+ Date startDate = new SimpleDateFormat("MM/dd/yyyy").parse(start);
+
+ Integer numDays = Integer.parseInt(cmd.getOptionValue("n"));
+ String indexPrefix = cmd.getOptionValue("p");
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Running prune with args: " + startDate + " " + numDays);
+ }
+
+ Configuration configuration = null;
+
+ if( cmd.hasOption("z")){
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ CuratorFramework framework = CuratorFrameworkFactory.newClient(cmd.getOptionValue("z"),retryPolicy);
+ framework.start();
+ configuration = new Configuration(framework);
+
+ } else if ( cmd.hasOption("c") ){
+
+ String resourceFile = cmd.getOptionValue("c");
+ configuration = new Configuration(Paths.get(resourceFile));
+
+ }
+
+ configuration.update();
+
+ Map<String, Object> globalConfiguration = configuration.getGlobalConfig();
+ ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
+ builder.put("cluster.name", globalConfiguration.get("es.clustername"));
+ builder.put("curatorFramework.transport.ping_timeout","500s");
+ client = new TransportClient(builder.build())
+ .addTransportAddress(new InetSocketTransportAddress(globalConfiguration.get("es.ip").toString(), Integer.parseInt(globalConfiguration.get("es.port").toString())));
+
+ DataPruner pruner = new ElasticsearchDataPruner(startDate, numDays, configuration, client, indexPrefix);
+
+ LOG.info("Pruned " + pruner.prune() + " indices from " + globalConfiguration.get("es.ip") + ":" + globalConfiguration.get("es.port") + "/" + indexPrefix);
+
+
+ } catch (Exception e) {
+
+ e.printStackTrace();
+ System.exit(-1);
+
+ } finally {
+
+ if( null != client) {
+ client.close();
+ }
+
+ }
+
+ }
+
+ public static CommandLine checkOptions(Options help, Options options, String ... argv) throws ParseException {
+
+ CommandLine cmd = null;
+ CommandLineParser parser = new PosixParser();
+
+
+ try {
+
+ cmd = parser.parse(help,argv,true);
+
+ if( cmd.getOptions().length > 0){
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("ElasticsearchDataPrunerRunner", null, options, null, true);
+ System.exit(0);
+ }
+
+ cmd = parser.parse(options, argv);
+
+ } catch (ParseException e) {
+
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("ElasticsearchDataPrunerRunner", null, options, null, true);
+ throw e;
+
+ }
+
+
+ if( (cmd.hasOption("z") && cmd.hasOption("c")) || (!cmd.hasOption("z") && !cmd.hasOption("c")) ){
+
+ System.err.println("One (only) of zookeeper-hosts or config-location is required");
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("ElasticsearchDataPrunerRunner", null, options, null, true);
+ throw new RuntimeException("Must specify zookeeper-hosts or config-location, but not both");
+
+ }
+
+ return cmd;
+ }
+
+ public static Options buildOptions(){
+
+ Options options = new Options();
+
+ Option o = new Option("s", "start-date", true, "Starting Date (MM/DD/YYYY)");
+ o.setArgName("START_DATE");
+ o.setRequired(true);
+ options.addOption(o);
+
+ o = new Option("n", "numdays", true, "Number of days back to purge");
+ o.setArgName("NUMDAYS");
+ o.setRequired(true);
+ options.addOption(o);
+
+ o = new Option("p", "index-prefix", true, "Index prefix - e.g. bro_index_");
+ o.setArgName("PREFIX");
+ o.setRequired(true);
+ options.addOption(o);
+
+ o = new Option("c", "config-location", true, "Directory Path - e.g. /path/to/config/dir");
+ o.setArgName("CONFIG");
+ o.setRequired(false);
+ options.addOption(o);
+
+ o = new Option("z", "zookeeper-hosts", true, "Zookeeper URL - e.g. zkhost1:2181,zkhost2:2181,zkhost3:2181");
+ o.setArgName("PREFIX");
+ o.setRequired(false);
+ options.addOption(o);
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
new file mode 100644
index 0000000..097253c
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+
+import org.apache.commons.cli.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class HDFSDataPruner extends DataPruner {
+
+
+ private Path globPath;
+ protected FileSystem fileSystem;
+ protected static final Logger LOG = LoggerFactory.getLogger(HDFSDataPruner.class);
+
+ HDFSDataPruner(Date startDate, Integer numDays, String fsUri, String globPath) throws IOException, StartDateException {
+
+ super(startDate,numDays,globPath);
+ this.globPath = new Path(wildCard);
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", fsUri);
+ this.fileSystem = FileSystem.get(conf);
+
+ }
+
+
+ public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+
+ /**
+ * Example
+ * start=$(date -d '30 days ago' +%m/%d/%Y)
+ * yarn jar Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.bulk.HDFSDataPruner -f hdfs://ec2-52-36-25-217.us-west-2.compute.amazonaws.com:8020 -g '/apps/metron/enrichment/indexed/bro_doc/*enrichment-*' -s $(date -d '30 days ago' +%m/%d/%Y) -n 1;
+ * echo ${start}
+ **/
+
+ Options options = new Options();
+ Options help = new Options();
+
+ {
+ Option o = new Option("h", "help", false, "This screen");
+ o.setRequired(false);
+ help.addOption(o);
+ }
+ {
+ Option o = new Option("s", "start-date", true, "Starting Date (MM/DD/YYYY)");
+ o.setArgName("START_DATE");
+ o.setRequired(true);
+ options.addOption(o);
+ }
+ {
+ Option o = new Option("f", "filesystem", true, "Filesystem uri - e.g. hdfs://host:8020 or file:///");
+ o.setArgName("FILESYSTEM");
+ o.setRequired(true);
+ options.addOption(o);
+ }
+ {
+ Option o = new Option("n", "numdays", true, "Number of days back to purge");
+ o.setArgName("NUMDAYS");
+ o.setRequired(true);
+ options.addOption(o);
+ }
+ {
+ Option o = new Option("g", "glob-string", true, "Glob filemask for files to delete - e.g. /apps/metron/enrichment/bro_doc/file-*");
+ o.setArgName("GLOBSTRING");
+ o.setRequired(true);
+ options.addOption(o);
+ }
+
+ try {
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = null;
+
+ try {
+
+ cmd = parser.parse(help,argv,true);
+ if( cmd.getOptions().length > 0){
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("HDFSDataPruner", null, options, null, true);
+ System.exit(0);
+ }
+
+ cmd = parser.parse(options, argv);
+
+ } catch (ParseException pe) {
+
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("HDFSDataPruner", null, options, null, true);
+ System.exit(-1);
+
+ }
+
+ String start = cmd.getOptionValue("s");
+ Date startDate = new SimpleDateFormat("MM/dd/yyyy").parse(start);
+ String fileSystemUri = cmd.getOptionValue("f");
+ Integer numDays = Integer.parseInt(cmd.getOptionValue("n"));
+ String globString = cmd.getOptionValue("g");
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Running prune with args: " + startDate + " " + numDays + " " + fileSystemUri + " " + globString);
+ }
+
+ DataPruner pruner = new HDFSDataPruner(startDate, numDays, fileSystemUri, globString);
+
+ LOG.info("Pruned " + pruner.prune() + " files from " + fileSystemUri + globString);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+
+ public Long prune() throws IOException {
+
+ Long filesPruned = new Long(0);
+
+ FileStatus[] filesToDelete = fileSystem.globStatus(globPath, new HDFSDataPruner.DateFileFilter(this));
+
+ for (FileStatus fileStatus : filesToDelete) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Deleting File: " + fileStatus.getPath());
+ }
+
+ fileSystem.delete(fileStatus.getPath(), false);
+
+ filesPruned++;
+ }
+
+ return filesPruned;
+ }
+
+ class DateFileFilter extends Configured implements PathFilter {
+
+ HDFSDataPruner pruner;
+ Boolean failOnError = false;
+
+ DateFileFilter(HDFSDataPruner pruner) {
+ this.pruner = pruner;
+ }
+
+ DateFileFilter(HDFSDataPruner pruner, Boolean failOnError) {
+
+ this(pruner);
+ this.failOnError = failOnError;
+
+ }
+
+ @Override
+ public boolean accept(Path path) {
+ try {
+
+ if(pruner.LOG.isDebugEnabled()) {
+ pruner.LOG.debug("ACCEPT - working with file: " + path);
+ }
+
+ if (pruner.fileSystem.isDirectory(path)) {
+ return false;
+
+ }
+ } catch (IOException e) {
+
+ pruner.LOG.error("IOException", e);
+
+ if (failOnError) {
+ throw new RuntimeException(e);
+ }
+
+ return false;
+ }
+
+ try {
+
+ FileStatus file = pruner.fileSystem.getFileStatus(path);
+ long fileModificationTime = file.getModificationTime();
+ boolean accept = false;
+
+ if (fileModificationTime >= pruner.firstTimeMillis && fileModificationTime < pruner.lastTimeMillis) {
+
+ accept = true;
+ }
+
+ return accept;
+
+ } catch (IOException e) {
+
+ pruner.LOG.error("IOException", e);
+
+ if (failOnError) {
+ throw new RuntimeException(e);
+ }
+
+ return false;
+ }
+
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
new file mode 100644
index 0000000..7acc96c
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.hbase.mr.PrunerMapper;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class LeastRecentlyUsedPruner {
+ private static abstract class OptionHandler implements Function<String, Option> {}
+ private enum BulkLoadOptions {
+ HELP("h", new OptionHandler() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ return new Option(s, "help", false, "Generate Help screen");
+ }
+ }), TABLE("t", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "table", true, "HBase table to prune");
+ o.setRequired(true);
+ o.setArgName("HBASE_TABLE");
+ return o;
+ }
+ }), COLUMN_FAMILY("f", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "column_family", true, "Column family of the HBase table to prune");
+ o.setRequired(false);
+ o.setArgName("CF_NAME");
+ return o;
+ }
+ })
+ ,AS_OF_TIME("a", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "as_of", true, "The earliest access tracker you want to use.");
+ o.setArgName("datetime");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ String defaultFormat = new SimpleDateFormat().toLocalizedPattern();
+ Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option) (Default is: " + defaultFormat + ")");
+ o.setArgName("format");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,ACCESS_TABLE("u", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "access_table", true, "HBase table containing the access trackers.");
+ o.setRequired(true);
+ o.setArgName("HBASE_TABLE");
+ return o;
+ }
+ }), ACCESS_COLUMN_FAMILY("z", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "access_column_family", true, "Column family of the HBase table containing the access trackers");
+ o.setRequired(true);
+ o.setArgName("CF_NAME");
+ return o;
+ }
+ });
+ Option option;
+ String shortCode;
+ BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+ this.shortCode = shortCode;
+ this.option = optionHandler.apply(shortCode);
+ }
+
+ public boolean has(CommandLine cli) {
+ return cli.hasOption(shortCode);
+ }
+
+ public String get(CommandLine cli) {
+ return cli.getOptionValue(shortCode);
+ }
+ private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+ Date d = getFormat(cli).parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+ return d.getTime();
+ }
+
+ private static DateFormat getFormat(CommandLine cli) {
+ DateFormat format = new SimpleDateFormat();
+ if (BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+ format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+ }
+ return format;
+ }
+
+ public static CommandLine parse(CommandLineParser parser, String[] args) {
+ try {
+ CommandLine cli = parser.parse(getOptions(), args);
+ if(BulkLoadOptions.HELP.has(cli)) {
+ printHelp();
+ System.exit(0);
+ }
+ return cli;
+ } catch (ParseException e) {
+ System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+ e.printStackTrace(System.err);
+ printHelp();
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ public static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "LeastRecentlyUsedPruner", getOptions());
+ }
+
+ public static Options getOptions() {
+ Options ret = new Options();
+ for(BulkLoadOptions o : BulkLoadOptions.values()) {
+ ret.addOption(o.option);
+ }
+ return ret;
+ }
+ }
+
+ public static void setupHBaseJob(Job job, String sourceTable, String cf) throws IOException {
+ Scan scan = new Scan();
+ if(cf != null) {
+ scan.addFamily(Bytes.toBytes(cf));
+ }
+ scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+ scan.setCacheBlocks(false); // don't set to true for MR jobs
+// set other scan attrs
+
+ TableMapReduceUtil.initTableMapperJob(
+ sourceTable, // input table
+ scan, // Scan instance to control CF and attribute selection
+ PrunerMapper.class, // mapper class
+ null, // mapper output key
+ null, // mapper output value
+ job);
+ TableMapReduceUtil.initTableReducerJob(
+ sourceTable, // output table
+ null, // reducer class
+ job);
+ }
+
+ public static Job createJob( Configuration conf
+ , String table
+ , String cf
+ , String accessTrackerTable
+ , String accessTrackerColumnFamily
+ , Long ts
+ ) throws IOException
+ {
+ Job job = new Job(conf);
+ job.setJobName("LeastRecentlyUsedPruner: Pruning " + table + ":" + cf + " since " + new SimpleDateFormat().format(new Date(ts)));
+ System.out.println("Configuring " + job.getJobName());
+ job.setJarByClass(LeastRecentlyUsedPruner.class);
+ job.getConfiguration().setLong(PrunerMapper.TIMESTAMP_CONF, ts);
+ job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_NAME_CONF, table);
+ job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_CF_CONF, accessTrackerColumnFamily);
+ job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_TABLE_CONF, accessTrackerTable);
+ setupHBaseJob(job, table, cf);
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+ Configuration conf = HBaseConfiguration.create();
+ String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+ CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+ Long ts = BulkLoadOptions.getTimestamp(cli);
+ String table = BulkLoadOptions.TABLE.get(cli);
+ String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+ String accessTrackerTable = BulkLoadOptions.ACCESS_TABLE.get(cli);
+ String accessTrackerCF = BulkLoadOptions.ACCESS_COLUMN_FAMILY.get(cli);
+ Job job = createJob(conf, table, cf, accessTrackerTable, accessTrackerCF, ts);
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java
new file mode 100644
index 0000000..d3a0549
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+
+public class StartDateException extends Exception {
+
+ public StartDateException(String message){
+ super(message);
+ }
+
+ public StartDateException(String message, Throwable t){
+ super(message,t);
+ }
+
+}