You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2016/04/26 16:46:21 UTC
[33/51] [partial] incubator-metron git commit: METRON-113 Project
Reorganization (merrimanr) closes apache/incubator-metron#88
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
new file mode 100644
index 0000000..1cc591b
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.io.Files;
+import org.apache.commons.cli.*;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
+import org.apache.metron.common.configuration.EnrichmentConfig;
+import org.apache.metron.enrichment.converter.HbaseConverter;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.common.utils.JSONUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.text.*;
+import java.util.Date;
+
+public class ThreatIntelBulkLoader {
+ private static abstract class OptionHandler implements Function<String, Option> {}
+ private enum BulkLoadOptions {
+ HELP("h", new OptionHandler() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ return new Option(s, "help", false, "Generate Help screen");
+ }
+ })
+ ,TABLE("t", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "table", true, "HBase table to import data into");
+ o.setRequired(true);
+ o.setArgName("HBASE_TABLE");
+ return o;
+ }
+ })
+ ,COLUMN_FAMILY("f", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "column_family", true, "Column family of the HBase table to import into");
+ o.setRequired(true);
+ o.setArgName("CF_NAME");
+ return o;
+ }
+ })
+ ,EXTRACTOR_CONFIG("e", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+ o.setArgName("JSON_FILE");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,INPUT_DATA("i", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "input", true, "Input directory in HDFS for the data to import into HBase");
+ o.setArgName("DIR");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,AS_OF_TIME("a", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "as_of", true, "The last read timestamp to mark the records with (omit for time of execution)");
+ o.setArgName("datetime");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,AS_OF_TIME_FORMAT("z", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option)");
+ o.setArgName("format");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,CONVERTER("c", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "converter", true, "The HBase converter class to use (Default is threat intel)");
+ o.setArgName("class");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,ENRICHMENT_CONFIG("n", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "enrichment_config", true
+ , "JSON Document describing the enrichment configuration details." +
+ " This is used to associate an enrichment type with a field type in zookeeper."
+ );
+ o.setArgName("JSON_FILE");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ;
+ Option option;
+ String shortCode;
+ BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+ this.shortCode = shortCode;
+ this.option = optionHandler.apply(shortCode);
+ }
+
+ public boolean has(CommandLine cli) {
+ return cli.hasOption(shortCode);
+ }
+
+ public String get(CommandLine cli) {
+ return cli.getOptionValue(shortCode);
+ }
+
+ public static CommandLine parse(CommandLineParser parser, String[] args) {
+ try {
+ CommandLine cli = parser.parse(getOptions(), args);
+ if(ThreatIntelBulkLoader.BulkLoadOptions.HELP.has(cli)) {
+ printHelp();
+ System.exit(0);
+ }
+ return cli;
+ } catch (ParseException e) {
+ System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+ e.printStackTrace(System.err);
+ printHelp();
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ public static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
+ }
+
+ public static Options getOptions() {
+ Options ret = new Options();
+ for(BulkLoadOptions o : BulkLoadOptions.values()) {
+ ret.addOption(o.option);
+ }
+ return ret;
+ }
+ }
+ private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+ if(BulkLoadOptions.AS_OF_TIME.has(cli)) {
+ if(!BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+ throw new IllegalStateException("Unable to proceed: Specified as_of_time without an associated format.");
+ }
+ else {
+ DateFormat format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+ Date d = format.parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+ return d.getTime();
+ }
+ }
+ else {
+ return System.currentTimeMillis();
+ }
+ }
+ private static String readExtractorConfig(File configFile) throws IOException {
+ return Joiner.on("\n").join(Files.readLines(configFile, Charset.defaultCharset()));
+ }
+
+ public static Job createJob(Configuration conf, String input, String table, String cf, String extractorConfigContents, long ts, HbaseConverter converter) throws IOException {
+ Job job = new Job(conf);
+ job.setJobName("ThreatIntelBulkLoader: " + input + " => " + table + ":" + cf);
+ System.out.println("Configuring " + job.getJobName());
+ job.setJarByClass(ThreatIntelBulkLoader.class);
+ job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
+ job.setOutputFormatClass(TableOutputFormat.class);
+ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
+ job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
+ job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
+ job.getConfiguration().set(BulkLoadMapper.LAST_SEEN_KEY, "" + ts);
+ job.getConfiguration().set(BulkLoadMapper.CONVERTER_KEY, converter.getClass().getName());
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Put.class);
+ job.setNumReduceTasks(0);
+ ExtractorHandler handler = ExtractorHandler.load(extractorConfigContents);
+ handler.getInputFormatHandler().set(job, new Path(input), handler.getConfig());
+ return job;
+ }
+
+ public static void main(String... argv) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+ CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+ Long ts = getTimestamp(cli);
+ String input = BulkLoadOptions.INPUT_DATA.get(cli);
+ String table = BulkLoadOptions.TABLE.get(cli);
+ String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+ String extractorConfigContents = readExtractorConfig(new File(BulkLoadOptions.EXTRACTOR_CONFIG.get(cli)));
+ String converterClass = EnrichmentConverter.class.getName();
+ if(BulkLoadOptions.CONVERTER.has(cli)) {
+ converterClass = BulkLoadOptions.CONVERTER.get(cli);
+ }
+ EnrichmentConfig enrichmentConfig = null;
+ if(BulkLoadOptions.ENRICHMENT_CONFIG.has(cli)) {
+ enrichmentConfig = JSONUtils.INSTANCE.load( new File(BulkLoadOptions.ENRICHMENT_CONFIG.get(cli))
+ , EnrichmentConfig.class
+ );
+ }
+
+ HbaseConverter converter = (HbaseConverter) Class.forName(converterClass).newInstance();
+ Job job = createJob(conf, input, table, cf, extractorConfigContents, ts, converter);
+ System.out.println(conf);
+ boolean jobRet = job.waitForCompletion(true);
+ if(!jobRet) {
+ System.exit(1);
+ }
+ if(enrichmentConfig != null) {
+ enrichmentConfig.updateSensorConfigs();
+ }
+ System.exit(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/cif/HBaseTableLoad.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/cif/HBaseTableLoad.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/cif/HBaseTableLoad.java
new file mode 100644
index 0000000..0cff227
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/cif/HBaseTableLoad.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.cif;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipInputStream;
+
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+
+import java.io.BufferedInputStream;
+
+public class HBaseTableLoad {
+
+ private static final Logger LOG = Logger.getLogger(HBaseTableLoad.class);
+ private static Configuration conf = null;
+ private String hbaseTable = "cif_table";
+ private String dirName = "./";
+ private boolean usefileList = false;
+ private Set<String> files;
+
+ /**
+ * Initialization
+ */
+ static {
+ conf = HBaseConfiguration.create();
+ }
+
+ public static void main(String[] args) {
+
+ HBaseTableLoad ht = new HBaseTableLoad();
+
+ ht.parse(args);
+ //ht.LoadDirHBase();
+
+ }
+
+ private void LoadDirHBase() {
+ LOG.info("Working on:" + dirName);
+ File folder = new File(dirName);
+ File[] listOfFiles = folder.listFiles();
+ InputStream input;
+
+ for (int i = 0; i < listOfFiles.length; i++) {
+ File file = listOfFiles[i];
+
+ if (file.isFile()) {
+
+ // Check if filename is present in FileList
+ if (usefileList)
+ if (!files.contains(file.getAbsolutePath()))
+ continue;
+
+ // e.g. folder name is infrastructure_botnet. Col Qualifier is
+ // botnet and col_family is infrastructure
+
+ String col_family = folder.getName().split("_")[0];
+ String col_qualifier = folder.getName().split("_")[1];
+
+ // Open file
+ try {
+ if (file.getName().endsWith(".gz"))
+ input = new BufferedInputStream(new GZIPInputStream(
+ new FileInputStream(file)));
+ else if (file.getName().endsWith(".zip"))
+ input = new BufferedInputStream(new ZipInputStream(
+ new FileInputStream(file)));
+ else if (file.getName().endsWith(".json"))
+ input = new BufferedInputStream((new FileInputStream(
+ file)));
+ else
+ continue;
+
+ LOG.info("Begin Loading File:" + file.getAbsolutePath());
+
+ HBaseBulkPut(input, col_family, col_qualifier);
+ LOG.info("Completed Loading File:" + file.getAbsolutePath());
+
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (ParseException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ } else if (file.isDirectory()) // if sub-directory then call the
+ // function recursively
+ this.LoadDirHBase(file.getAbsolutePath());
+ }
+ }
+
+ private void LoadDirHBase(String dirname) {
+
+ this.dirName = dirname;
+ this.LoadDirHBase();
+
+ }
+
+ /**
+ * @param input
+ * @param hbaseTable
+ * @param col_family
+ * @throws IOException
+ * @throws ParseException
+ *
+ *
+ * Inserts all json records picked up from the inputStream
+ */
+ private void HBaseBulkPut(InputStream input, String col_family,
+ String col_qualifier) throws IOException, ParseException {
+
+ HTable table = new HTable(conf, hbaseTable);
+ JSONParser parser = new JSONParser();
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(input));
+ String jsonString;
+ List<Put> allputs = new ArrayList<Put>();
+ Map json;
+
+ while ((jsonString = br.readLine()) != null) {
+
+ try {
+
+ json = (Map) parser.parse(jsonString);
+ } catch (ParseException e) {
+ // System.out.println("Unable to Parse: " +jsonString);
+ continue;
+ }
+ // Iterator iter = json.entrySet().iterator();
+
+ // Get Address - either IP/domain or email and make that the Key
+ Put put = new Put(Bytes.toBytes((String) json.get("address")));
+
+ // We are just adding a "Y" flag to mark this address
+ put.add(Bytes.toBytes(col_family), Bytes.toBytes(col_qualifier),
+ Bytes.toBytes("Y"));
+
+ allputs.add(put);
+ }
+ table.put(allputs);
+ table.close();
+ }
+
+ private void printUsage() {
+ System.out
+ .println("Usage: java -cp JarFile org.apache.metron.dataloads.cif.HBaseTableLoad -d <directory> -t <tablename> -f <optional file-list>");
+ }
+
+ private void parse(String[] args) {
+ CommandLineParser parser = new BasicParser();
+ Options options = new Options();
+
+ options.addOption("d", true, "description");
+ options.addOption("t", true, "description");
+ options.addOption("f", false, "description");
+
+ CommandLine cmd = null;
+ try {
+ cmd = parser.parse(options, args);
+
+ if (cmd.hasOption("d"))
+ {
+ this.dirName = cmd.getOptionValue("d");
+ LOG.info("Directory Name:" + cmd.getOptionValue("d"));
+ }
+ else {
+ LOG.info("Missing Directory Name");
+ printUsage();
+ System.exit(-1);
+ }
+
+ if (cmd.hasOption("t"))
+ {
+ this.hbaseTable = cmd.getOptionValue("t");
+ LOG.info("HBase Table Name:" + cmd.getOptionValue("t"));
+ }
+ else {
+ LOG.info("Missing Table Name");
+ printUsage();
+ System.exit(-1);
+ }
+
+ if (cmd.hasOption("f")) {
+ this.usefileList = true;
+ files = LoadFileList(cmd.getOptionValue("f"));
+ LOG.info("FileList:" + cmd.getOptionValue("f"));
+ }
+
+ } catch (org.apache.commons.cli.ParseException e) {
+ LOG.error("Failed to parse comand line properties", e);
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+
+ private Set<String> LoadFileList(String filename) {
+
+ Set<String> output = null;
+ BufferedReader reader;
+
+ try {
+ reader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filename)));
+ output = new HashSet<String>();
+ String in = "";
+
+ while ((in = reader.readLine()) != null)
+ output.add(in);
+
+ reader.close();
+
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ return output;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
new file mode 100644
index 0000000..bd490c8
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface Extractor {
+ Iterable<LookupKV> extract(String line) throws IOException;
+ void initialize(Map<String, Object> config);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java
new file mode 100644
index 0000000..6e081aa
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCreator.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import java.util.Map;
+
+public interface ExtractorCreator {
+ Extractor create();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
new file mode 100644
index 0000000..5d17cbe
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.dataloads.extractor.inputformat.Formats;
+import org.apache.metron.dataloads.extractor.inputformat.InputFormatHandler;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class ExtractorHandler {
+ final static ObjectMapper _mapper = new ObjectMapper();
+ private Map<String, Object> config;
+ private Extractor extractor;
+ private InputFormatHandler inputFormatHandler = Formats.BY_LINE;
+
+ public Map<String, Object> getConfig() {
+ return config;
+ }
+
+ public void setConfig(Map<String, Object> config) {
+ this.config = config;
+ }
+
+ public InputFormatHandler getInputFormatHandler() {
+ return inputFormatHandler;
+ }
+
+ public void setInputFormatHandler(String handler) {
+ try {
+ this.inputFormatHandler= Formats.create(handler);
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new IllegalStateException("Unable to create an inputformathandler", e);
+ }
+ }
+
+ public Extractor getExtractor() {
+ return extractor;
+ }
+ public void setExtractor(String extractor) {
+ try {
+ this.extractor = Extractors.create(extractor);
+ } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+ throw new IllegalStateException("Unable to create an extractor", e);
+ }
+ }
+
+ public static synchronized ExtractorHandler load(InputStream is) throws IOException {
+ ExtractorHandler ret = _mapper.readValue(is, ExtractorHandler.class);
+ ret.getExtractor().initialize(ret.getConfig());
+ return ret;
+ }
+ public static synchronized ExtractorHandler load(String s, Charset c) throws IOException {
+ return load( new ByteArrayInputStream(s.getBytes(c)));
+ }
+ public static synchronized ExtractorHandler load(String s) throws IOException {
+ return load( s, Charset.defaultCharset());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
new file mode 100644
index 0000000..bbd4c22
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.dataloads.extractor.csv.CSVExtractor;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+
+import java.util.Map;
+
+public enum Extractors implements ExtractorCreator {
+ CSV(new ExtractorCreator() {
+
+ @Override
+ public Extractor create() {
+ return new CSVExtractor();
+ }
+ })
+ ,STIX(new ExtractorCreator() {
+ @Override
+ public Extractor create() {
+ return new StixExtractor();
+ }
+ })
+ ;
+ ExtractorCreator _creator;
+ Extractors(ExtractorCreator creator) {
+ this._creator = creator;
+ }
+ @Override
+ public Extractor create() {
+ return _creator.create();
+ }
+ public static Extractor create(String extractorName) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ try {
+ ExtractorCreator ec = Extractors.valueOf(extractorName);
+ return ec.create();
+ }
+ catch(IllegalArgumentException iae) {
+ Extractor ex = (Extractor) Class.forName(extractorName).newInstance();
+ return ex;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
new file mode 100644
index 0000000..1fce0fc
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.csv;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.enrichment.lookup.LookupKey;
+
+import java.io.IOException;
+import java.util.*;
+
+public class CSVExtractor implements Extractor {
+ public static final String COLUMNS_KEY="columns";
+ public static final String INDICATOR_COLUMN_KEY="indicator_column";
+ public static final String TYPE_COLUMN_KEY="type_column";
+ public static final String TYPE_KEY="type";
+ public static final String SEPARATOR_KEY="separator";
+ public static final String LOOKUP_CONVERTER = "lookup_converter";
+
+ private int typeColumn;
+ private String type;
+ private int indicatorColumn;
+ private Map<String, Integer> columnMap = new HashMap<>();
+ private CSVParser parser;
+ private LookupConverter converter = LookupConverters.ENRICHMENT.getConverter();
+
+ @Override
+ public Iterable<LookupKV> extract(String line) throws IOException {
+ if(line.trim().startsWith("#")) {
+ //comment
+ return Collections.emptyList();
+ }
+ String[] tokens = parser.parseLine(line);
+
+ LookupKey key = converter.toKey(getType(tokens), tokens[indicatorColumn]);
+ Map<String, String> values = new HashMap<>();
+ for(Map.Entry<String, Integer> kv : columnMap.entrySet()) {
+ values.put(kv.getKey(), tokens[kv.getValue()]);
+ }
+ return Arrays.asList(new LookupKV(key, converter.toValue(values)));
+ }
+
+ private String getType(String[] tokens) {
+ if(type == null) {
+ return tokens[typeColumn];
+ }
+ else {
+ return type;
+ }
+ }
+
+ private static Map.Entry<String, Integer> getColumnMapEntry(String column, int i) {
+ if(column.contains(":")) {
+ Iterable<String> tokens = Splitter.on(':').split(column);
+ String col = Iterables.getFirst(tokens, null);
+ Integer pos = Integer.parseInt(Iterables.getLast(tokens));
+ return new AbstractMap.SimpleEntry<>(col, pos);
+ }
+ else {
+ return new AbstractMap.SimpleEntry<>(column, i);
+ }
+
+ }
+ private static Map<String, Integer> getColumnMap(Map<String, Object> config) {
+ Map<String, Integer> columnMap = new HashMap<>();
+ if(config.containsKey(COLUMNS_KEY)) {
+ Object columnsObj = config.get(COLUMNS_KEY);
+ if(columnsObj instanceof String) {
+ String columns = (String)columnsObj;
+ int i = 0;
+ for (String column : Splitter.on(',').split(columns)) {
+ Map.Entry<String, Integer> e = getColumnMapEntry(column, i++);
+ columnMap.put(e.getKey(), e.getValue());
+ }
+ }
+ else if(columnsObj instanceof List) {
+ List columns = (List)columnsObj;
+ int i = 0;
+ for(Object column : columns) {
+ Map.Entry<String, Integer> e = getColumnMapEntry(column.toString(), i++);
+ columnMap.put(e.getKey(), e.getValue());
+ }
+ }
+ else if(columnsObj instanceof Map) {
+ Map<Object, Object> map = (Map<Object, Object>)columnsObj;
+ for(Map.Entry<Object, Object> e : map.entrySet()) {
+ columnMap.put(e.getKey().toString(), Integer.parseInt(e.getValue().toString()));
+ }
+ }
+ }
+ return columnMap;
+ }
+
+ @Override
+ public void initialize(Map<String, Object> config) {
+ if(config.containsKey(COLUMNS_KEY)) {
+ columnMap = getColumnMap(config);
+ }
+ else {
+ throw new IllegalStateException("CSVExtractor requires " + COLUMNS_KEY + " configuration");
+ }
+ if(config.containsKey(INDICATOR_COLUMN_KEY)) {
+ indicatorColumn = columnMap.get(config.get(INDICATOR_COLUMN_KEY).toString());
+ }
+ if(config.containsKey(TYPE_KEY)) {
+ type = config.get(TYPE_KEY).toString();
+ }
+ else if(config.containsKey(TYPE_COLUMN_KEY)) {
+ typeColumn = columnMap.get(config.get(TYPE_COLUMN_KEY).toString());
+ }
+ if(config.containsKey(SEPARATOR_KEY)) {
+ char separator = config.get(SEPARATOR_KEY).toString().charAt(0);
+ parser = new CSVParserBuilder().withSeparator(separator)
+ .build();
+ }
+ if(config.containsKey(LOOKUP_CONVERTER)) {
+ converter = LookupConverters.getConverter((String) config.get(LOOKUP_CONVERTER));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
new file mode 100644
index 0000000..e0ca4ee
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.extractor.csv;
+
+import org.apache.metron.enrichment.lookup.LookupKey;
+import org.apache.metron.enrichment.lookup.LookupValue;
+
+import java.util.Map;
+
+public interface LookupConverter {
+ LookupKey toKey(String type, String indicator);
+ LookupValue toValue(Map<String, String> metadata);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
new file mode 100644
index 0000000..bd58ba7
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.extractor.csv;
+
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKey;
+import org.apache.metron.enrichment.lookup.LookupValue;
+
+import java.util.Map;
+
+public enum LookupConverters {
+
+ ENRICHMENT(new LookupConverter() {
+ @Override
+ public LookupKey toKey(String type, String indicator) {
+ return new EnrichmentKey(type, indicator);
+
+ }
+
+ @Override
+ public LookupValue toValue(Map<String, String> metadata) {
+ return new EnrichmentValue(metadata);
+ }
+ })
+ ;
+ LookupConverter converter;
+ LookupConverters(LookupConverter converter) {
+ this.converter = converter;
+ }
+ public LookupConverter getConverter() {
+ return converter;
+ }
+
+ public static LookupConverter getConverter(String name) {
+ try {
+ return LookupConverters.valueOf(name).getConverter();
+ }
+ catch(Throwable t) {
+ try {
+ return (LookupConverter) Class.forName(name).newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("Unable to parse " + name, e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Unable to parse " + name, e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to parse " + name, e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
new file mode 100644
index 0000000..7e58455
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.inputformat;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.io.IOException;
+import java.util.Map;
+
+public enum Formats implements InputFormatHandler{
+ BY_LINE(new InputFormatHandler() {
+ @Override
+ public void set(Job job, Path input, Map<String, Object> config) throws IOException {
+
+ FileInputFormat.addInputPath(job, input);
+ }
+ })
+ ;
+ InputFormatHandler _handler = null;
+ Formats(InputFormatHandler handler) {
+ this._handler = handler;
+ }
+ @Override
+ public void set(Job job, Path path, Map<String, Object> config) throws IOException {
+ _handler.set(job, path, config);
+ }
+
+ public static InputFormatHandler create(String handlerName) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ try {
+ InputFormatHandler ec = Formats.valueOf(handlerName);
+ return ec;
+ }
+ catch(IllegalArgumentException iae) {
+ InputFormatHandler ex = (InputFormatHandler) Class.forName(handlerName).newInstance();
+ return ex;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
new file mode 100644
index 0000000..2287969
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.inputformat;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface InputFormatHandler {
+ void set(Job job, Path input, Map<String, Object> config) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
new file mode 100644
index 0000000..e0a58ef
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.inputformat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class WholeFileFormat implements InputFormatHandler {
+
+ public static class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
+ private FileSplit fileSplit;
+ private Configuration conf;
+ private Text value = new Text();
+ private boolean processed = false;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ this.fileSplit = (FileSplit) split;
+ this.conf = context.getConfiguration();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!processed) {
+ byte[] contents = new byte[(int) fileSplit.getLength()];
+ Path file = fileSplit.getPath();
+ FileSystem fs = file.getFileSystem(conf);
+ FSDataInputStream in = null;
+ try {
+ in = fs.open(file);
+ IOUtils.readFully(in, contents, 0, contents.length);
+ value.set(contents, 0, contents.length);
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ processed = true;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+ @Override
+ public Text getCurrentValue() throws IOException, InterruptedException{
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return processed ? 1.0f : 0.0f;
+ }
+
+ @Override
+ public void close() throws IOException{
+ //do nothing :)
+ }
+ }
+
+ public static class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> {
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path file) {
+ return false;
+ }
+
+ @Override
+ public RecordReader<NullWritable, Text> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ WholeFileRecordReader reader = new WholeFileRecordReader();
+ reader.initialize(split, context);
+ return reader;
+ }
+ }
+ @Override
+ public void set(Job job, Path input, Map<String, Object> config) throws IOException {
+ WholeFileInputFormat.setInputPaths(job, input);
+ job.setInputFormatClass(WholeFileInputFormat.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
new file mode 100644
index 0000000..4696639
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix;
+
+import com.google.common.base.Splitter;
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandler;
+import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandlers;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.mitre.cybox.common_2.*;
+import org.mitre.cybox.cybox_2.ObjectType;
+import org.mitre.cybox.cybox_2.Observable;
+import org.mitre.cybox.cybox_2.Observables;
+import org.mitre.stix.common_1.IndicatorBaseType;
+import org.mitre.stix.indicator_2.Indicator;
+import org.mitre.stix.stix_1.STIXPackage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class StixExtractor implements Extractor {
+ Map<String, Object> config;
+ @Override
+ public Iterable<LookupKV> extract(String line) throws IOException {
+ STIXPackage stixPackage = STIXPackage.fromXMLString(line.replaceAll("\"Equal\"", "\"Equals\""));
+ List<LookupKV> ret = new ArrayList<>();
+ for(Observable o : getObservables(stixPackage)) {
+ ObjectType obj = o.getObject();
+ if(obj != null) {
+ ObjectPropertiesType props = obj.getProperties();
+ if(props != null) {
+ ObjectTypeHandler handler = ObjectTypeHandlers.getHandlerByInstance(props);
+ if (handler != null) {
+ Iterable<LookupKV> extractions = handler.extract(props, config);
+ for(LookupKV extraction : extractions) {
+ ret.add(extraction);
+ }
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ public List<Observable> getObservables(STIXPackage stixPackage) {
+ List<Observable> ret = new ArrayList<>();
+ Observables observables = stixPackage.getObservables();
+ if(observables != null) {
+ for (Observable o : observables.getObservables()) {
+ ret.add(o);
+ }
+ }
+ if (stixPackage.getIndicators() != null) {
+ if (stixPackage.getIndicators().getIndicators() != null) {
+ List<IndicatorBaseType> indicators = stixPackage.getIndicators().getIndicators();
+ int indicatorCount = indicators.size();
+ for (int i = 0; i < indicatorCount; i++) {
+ Indicator indicator = (Indicator) indicators.get(i);
+ if (indicator.getObservable() != null) {
+ ret.add(indicator.getObservable());
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public void initialize(Map<String, Object> config) {
+ this.config = config;
+ }
+
+ public static Iterable<String> split(StringObjectPropertyType value) {
+ final ConditionTypeEnum condition = value.getCondition();
+ final ConditionApplicationEnum applyCondition = value.getApplyCondition();
+ List<String> tokens = new ArrayList<>();
+ if(condition == ConditionTypeEnum.EQUALS && applyCondition == ConditionApplicationEnum.ANY) {
+ String delim = value.getDelimiter();
+ String line = value.getValue().toString();
+ if (delim != null) {
+ for (String token : Splitter.on(delim).split(line)) {
+ tokens.add(token);
+ }
+ } else {
+ tokens.add(line);
+ }
+ }
+ return tokens;
+ }
+ public static void main(String[] args) throws IOException {
+
+ File file = new File("/tmp/sample.xml");
+
+ /*if (args.length > 0) {
+ file = new File(args[0]);
+ } else {
+ try {
+ URL url = XML2Object.class.getClass().getResource(
+ "/org/mitre/stix/examples/sample.xml");
+ file = new File(url.toURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }*/
+
+ String line = FileUtils.readFileToString(file);
+ StixExtractor extractor = new StixExtractor();
+ for(LookupKV results : extractor.extract(line)) {
+ System.out.println(results);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
new file mode 100644
index 0000000..b637c6e
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AbstractObjectTypeHandler.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.mitre.cybox.common_2.ObjectPropertiesType;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+
+public abstract class AbstractObjectTypeHandler<T extends ObjectPropertiesType> implements ObjectTypeHandler<T> {
+ protected Class<T> objectPropertiesType;
+ public AbstractObjectTypeHandler(Class<T> clazz) {
+ objectPropertiesType = clazz;
+ }
+ @Override
+ public Class<T> getTypeClass() {
+ return objectPropertiesType;
+ }
+ public String getType() {
+ return getTypeClass().getSimpleName().toLowerCase();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
new file mode 100644
index 0000000..ffcff43
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import com.google.common.base.Splitter;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+import org.mitre.cybox.objects.Address;
+import org.mitre.cybox.objects.CategoryTypeEnum;
+
+import java.io.IOException;
+import java.util.*;
+
+public class AddressHandler extends AbstractObjectTypeHandler<Address> {
+ public static final String SPECIFIC_CATEGORY_CONFIG = "stix_address_categories";
+ public static final String TYPE_CONFIG = "stix_address_type";
+ public static final EnumSet<CategoryTypeEnum> SUPPORTED_CATEGORIES = EnumSet.of(CategoryTypeEnum.E_MAIL
+ ,CategoryTypeEnum.IPV_4_ADDR
+ ,CategoryTypeEnum.IPV_6_ADDR
+ ,CategoryTypeEnum.MAC
+ ) ;
+ public AddressHandler() {
+ super(Address.class);
+ }
+
+ @Override
+ public Iterable<LookupKV> extract(final Address type, Map<String, Object> config) throws IOException {
+ List<LookupKV> ret = new ArrayList<>();
+ final CategoryTypeEnum category= type.getCategory();
+ if(!SUPPORTED_CATEGORIES.contains(category)) {
+ return ret;
+ }
+ String typeStr = getType();
+ if(config != null) {
+ if(config.containsKey(SPECIFIC_CATEGORY_CONFIG)) {
+ List<CategoryTypeEnum> categories = new ArrayList<>();
+ for (String c : Splitter.on(",").split(config.get(SPECIFIC_CATEGORY_CONFIG).toString())) {
+ categories.add(CategoryTypeEnum.valueOf(c));
+ }
+ EnumSet<CategoryTypeEnum> specificCategories = EnumSet.copyOf(categories);
+ if (!specificCategories.contains(category)) {
+ return ret;
+ }
+ }
+ if(config.containsKey(TYPE_CONFIG)) {
+ typeStr = config.get(TYPE_CONFIG).toString();
+ }
+ }
+ StringObjectPropertyType value = type.getAddressValue();
+ for(String token : StixExtractor.split(value)) {
+ final String indicatorType = typeStr + ":" + category;
+ LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
+ , new EnrichmentValue(
+ new HashMap<String, String>() {{
+ put("source-type", "STIX");
+ put("indicator-type", indicatorType);
+ put("source", type.toXMLString());
+ }}
+ )
+ );
+ ret.add(results);
+ }
+ return ret;
+ }
+
+ @Override
+ public List<String> getPossibleTypes() {
+ String typeStr = getType();
+ List<String> ret = new ArrayList<>();
+ for(CategoryTypeEnum e : SUPPORTED_CATEGORIES)
+ {
+ ret.add(typeStr + ":" + e);
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
new file mode 100644
index 0000000..755cddd
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+import org.mitre.cybox.objects.DomainName;
+import org.mitre.cybox.objects.DomainNameTypeEnum;
+
+import java.io.IOException;
+import java.util.*;
+
+public class DomainHandler extends AbstractObjectTypeHandler<DomainName> {
+ public static final String TYPE_CONFIG = "stix_domain_type";
+ EnumSet<DomainNameTypeEnum> SUPPORTED_TYPES = EnumSet.of(DomainNameTypeEnum.FQDN);
+ public DomainHandler() {
+ super(DomainName.class);
+ }
+
+ @Override
+ public Iterable<LookupKV> extract(final DomainName type, Map<String, Object> config) throws IOException {
+ List<LookupKV> ret = new ArrayList<>();
+ String typeStr = getType();
+ if(config != null) {
+ Object o = config.get(TYPE_CONFIG);
+ if(o != null) {
+ typeStr = o.toString();
+ }
+ }
+ final DomainNameTypeEnum domainType = type.getType();
+ if(domainType == null || SUPPORTED_TYPES.contains(domainType)) {
+ StringObjectPropertyType value = type.getValue();
+ for (String token : StixExtractor.split(value)) {
+ final String indicatorType = typeStr + ":" + DomainNameTypeEnum.FQDN;
+ LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
+ , new EnrichmentValue(
+ new HashMap<String, String>() {{
+ put("source-type", "STIX");
+ put("indicator-type", indicatorType);
+ put("source", type.toXMLString());
+ }}
+ )
+ );
+ ret.add(results);
+ }
+ }
+ return ret;
+ }
+ @Override
+ public List<String> getPossibleTypes() {
+ String typeStr = getType();
+ List<String> ret = new ArrayList<>();
+ for(DomainNameTypeEnum e : SUPPORTED_TYPES)
+ {
+ ret.add(typeStr + ":" + e);
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
new file mode 100644
index 0000000..c7b05eb
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.mitre.cybox.common_2.StringObjectPropertyType;
+import org.mitre.cybox.objects.Hostname;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HostnameHandler extends AbstractObjectTypeHandler<Hostname>{
+ public static final String TYPE_CONFIG = "stix_hostname_type";
+ public HostnameHandler() {
+ super(Hostname.class);
+ }
+
+ @Override
+ public Iterable<LookupKV> extract(final Hostname type, Map<String, Object> config) throws IOException {
+ StringObjectPropertyType value = type.getHostnameValue();
+ String typeStr = getType();
+ if(config != null) {
+ Object o = config.get(TYPE_CONFIG);
+ if(o != null) {
+ typeStr = o.toString();
+ }
+ }
+ List<LookupKV> ret = new ArrayList<>();
+ for(String token : StixExtractor.split(value)) {
+ final String indicatorType = typeStr;
+ LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
+ , new EnrichmentValue(new HashMap<String, String>() {{
+ put("source-type", "STIX");
+ put("indicator-type", indicatorType);
+ put("source", type.toXMLString());
+ }}
+ )
+ );
+ ret.add(results);
+ }
+ return ret;
+ }
+ @Override
+ public List<String> getPossibleTypes() {
+ return ImmutableList.of(getType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
new file mode 100644
index 0000000..c7692be
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.mitre.cybox.common_2.ObjectPropertiesType;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public interface ObjectTypeHandler<T extends ObjectPropertiesType> {
+ Iterable<LookupKV> extract(T type, Map<String, Object> config) throws IOException;
+ Class<T> getTypeClass();
+ List<String> getPossibleTypes();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java
new file mode 100644
index 0000000..06d8cd8
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandlers.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor.stix.types;
+
+import org.mitre.cybox.common_2.ObjectPropertiesType;
+
+public enum ObjectTypeHandlers {
+ ADDRESS(new AddressHandler())
+ ,HOSTNAME(new HostnameHandler())
+ ,DOMAINNAME(new DomainHandler())
+ ,;
+ ObjectTypeHandler _handler;
+ ObjectTypeHandlers(ObjectTypeHandler handler) {
+ _handler = handler;
+ }
+ ObjectTypeHandler getHandler() {
+ return _handler;
+ }
+ public static ObjectTypeHandler getHandlerByInstance(ObjectPropertiesType inst) {
+ for(ObjectTypeHandlers h : values()) {
+ if(inst.getClass().equals(h.getHandler().getTypeClass())) {
+ return h.getHandler();
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
new file mode 100644
index 0000000..558ac16
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.enrichment.converter.HbaseConverter;
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.IOException;
+
+public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable, Put>
+{
+ public static final String CONFIG_KEY="bl_extractor_config";
+ public static final String COLUMN_FAMILY_KEY = "bl_column_family";
+ public static final String LAST_SEEN_KEY = "bl_last_seen";
+ public static final String CONVERTER_KEY = "bl_converter";
+ Extractor extractor = null;
+ String columnFamily = null;
+ HbaseConverter converter;
+ @Override
+ public void setup(Context context) throws IOException,
+ InterruptedException {
+ initialize(context.getConfiguration());
+ }
+
+ @Override
+ public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
+ for(LookupKV results : extractor.extract(value.toString())) {
+ if (results != null) {
+ Put put = converter.toPut(columnFamily, results.getKey(), results.getValue());
+ write(new ImmutableBytesWritable(results.getKey().toBytes()), put, context);
+ }
+ }
+ }
+
+ protected void initialize(Configuration configuration) throws IOException{
+ String configStr = configuration.get(CONFIG_KEY);
+ extractor = ExtractorHandler.load(configStr).getExtractor();
+ columnFamily = configuration.get(COLUMN_FAMILY_KEY);
+ try {
+ converter = (HbaseConverter) Class.forName(configuration.get(CONVERTER_KEY)).newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("Unable to create converter object: " + configuration.get(CONVERTER_KEY), e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Unable to create converter object: " + configuration.get(CONVERTER_KEY), e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to create converter object: " + configuration.get(CONVERTER_KEY), e);
+ }
+ }
+
+ protected void write(ImmutableBytesWritable key, Put value, Context context) throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
new file mode 100644
index 0000000..d0f1e46
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.metron.enrichment.lookup.LookupKey;
+import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker;
+import org.apache.metron.enrichment.lookup.accesstracker.AccessTrackerUtil;
+
+import java.io.IOException;
+
+public class PrunerMapper extends TableMapper<ImmutableBytesWritable, Delete> {
+ public static final String ACCESS_TRACKER_TABLE_CONF = "access_tracker_table";
+ public static final String ACCESS_TRACKER_CF_CONF = "access_tracker_cf";
+ public static final String TIMESTAMP_CONF = "access_tracker_timestamp";
+ public static final String ACCESS_TRACKER_NAME_CONF = "access_tracker_name";
+ AccessTracker tracker;
+ @Override
+ public void setup(Context context) throws IOException
+ {
+ String atTable = context.getConfiguration().get(ACCESS_TRACKER_TABLE_CONF);
+ String atCF = context.getConfiguration().get(ACCESS_TRACKER_CF_CONF);
+ String atName = context.getConfiguration().get(ACCESS_TRACKER_NAME_CONF);
+ HTable table = new HTable(context.getConfiguration(), atTable);
+ long timestamp = context.getConfiguration().getLong(TIMESTAMP_CONF, -1);
+ if(timestamp < 0) {
+ throw new IllegalStateException("Must specify a timestamp that is positive.");
+ }
+ try {
+ tracker = AccessTrackerUtil.INSTANCE.loadAll(AccessTrackerUtil.INSTANCE.loadAll(table, atCF, atName, timestamp));
+ } catch (Throwable e) {
+ throw new IllegalStateException("Unable to load the accesstrackers from the directory", e);
+ }
+ }
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
+ if(tracker == null || key == null) {
+ throw new RuntimeException("Tracker = " + tracker + " key = " + key);
+ }
+ if(!tracker.hasSeen(toLookupKey(key.get()))) {
+ Delete d = new Delete(key.get());
+ context.write(key, d);
+ }
+ }
+
+ protected LookupKey toLookupKey(final byte[] bytes) {
+ return new LookupKey() {
+ @Override
+ public byte[] toBytes() {
+ return bytes;
+ }
+
+ @Override
+ public void fromBytes(byte[] in) {
+
+ }
+ };
+ }
+
+}