You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:05:52 UTC

[10/43] incubator-metron git commit: METRON-50: Ingest threat intel data from Taxii feeds closes apache/incubator-metron#29

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
new file mode 100644
index 0000000..8f1f205
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.hbase.mr.PrunerMapper;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class LeastRecentlyUsedPruner {
+    private static abstract class OptionHandler implements Function<String, Option> {}
+    private enum BulkLoadOptions {
+        HELP("h", new OptionHandler() {
+
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                return new Option(s, "help", false, "Generate Help screen");
+            }
+        }), TABLE("t", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "table", true, "HBase table to prune");
+                o.setRequired(true);
+                o.setArgName("HBASE_TABLE");
+                return o;
+            }
+        }), COLUMN_FAMILY("f", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "column_family", true, "Column family of the HBase table to prune");
+                o.setRequired(false);
+                o.setArgName("CF_NAME");
+                return o;
+            }
+        })
+        ,AS_OF_TIME("a", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "as_of", true, "The earliest access tracker you want to use.");
+                o.setArgName("datetime");
+                o.setRequired(true);
+                return o;
+            }
+        })
+        ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                String defaultFormat = new SimpleDateFormat().toLocalizedPattern();
+                Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option) (Default is: " + defaultFormat + ")");
+                o.setArgName("format");
+                o.setRequired(false);
+                return o;
+            }
+        })
+        ,ACCESS_TABLE("u", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "access_table", true, "HBase table containing the access trackers.");
+                o.setRequired(true);
+                o.setArgName("HBASE_TABLE");
+                return o;
+            }
+        }), ACCESS_COLUMN_FAMILY("z", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "access_column_family", true, "Column family of the HBase table containing the access trackers");
+                o.setRequired(true);
+                o.setArgName("CF_NAME");
+                return o;
+            }
+        });
+        Option option;
+        String shortCode;
+        BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+            this.shortCode = shortCode;
+            this.option = optionHandler.apply(shortCode);
+        }
+
+        public boolean has(CommandLine cli) {
+            return cli.hasOption(shortCode);
+        }
+
+        public String get(CommandLine cli) {
+            return cli.getOptionValue(shortCode);
+        }
+        private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+            Date d = getFormat(cli).parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+            return d.getTime();
+        }
+
+        private static DateFormat getFormat(CommandLine cli) {
+            DateFormat format = new SimpleDateFormat();
+            if (BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+                 format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+            }
+            return format;
+        }
+
+        public static CommandLine parse(CommandLineParser parser, String[] args) {
+            try {
+                CommandLine cli = parser.parse(getOptions(), args);
+                if(BulkLoadOptions.HELP.has(cli)) {
+                    printHelp();
+                    System.exit(0);
+                }
+                return cli;
+            } catch (ParseException e) {
+                System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+                e.printStackTrace(System.err);
+                printHelp();
+                System.exit(-1);
+                return null;
+            }
+        }
+
+        public static void printHelp() {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
+        }
+
+        public static Options getOptions() {
+            Options ret = new Options();
+            for(BulkLoadOptions o : BulkLoadOptions.values()) {
+               ret.addOption(o.option);
+            }
+            return ret;
+        }
+    }
+
+    public static void setupHBaseJob(Job job, String sourceTable, String cf) throws IOException {
+        Scan scan = new Scan();
+        if(cf != null) {
+            scan.addFamily(Bytes.toBytes(cf));
+        }
+        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
+        scan.setCacheBlocks(false);  // don't set to true for MR jobs
+// set other scan attrs
+
+        TableMapReduceUtil.initTableMapperJob(
+                sourceTable,      // input table
+                scan,	          // Scan instance to control CF and attribute selection
+                PrunerMapper.class,   // mapper class
+                null,	          // mapper output key
+                null,	          // mapper output value
+                job);
+        TableMapReduceUtil.initTableReducerJob(
+                sourceTable,      // output table
+                null,             // reducer class
+                job);
+    }
+
+    public static Job createJob( Configuration conf
+                               , String table
+                               , String cf
+                               , String accessTrackerTable
+                               , String accessTrackerColumnFamily
+                               , Long ts
+                               ) throws IOException
+    {
+        Job job = new Job(conf);
+        job.setJobName("LeastRecentlyUsedPruner: Pruning " +  table + ":" + cf + " since " + new SimpleDateFormat().format(new Date(ts)));
+        System.out.println("Configuring " + job.getJobName());
+        job.setJarByClass(LeastRecentlyUsedPruner.class);
+        job.getConfiguration().setLong(PrunerMapper.TIMESTAMP_CONF, ts);
+        job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_NAME_CONF, table);
+        job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_CF_CONF, accessTrackerColumnFamily);
+        job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_TABLE_CONF, accessTrackerTable);
+        setupHBaseJob(job, table, cf);
+        job.setNumReduceTasks(0);
+        return job;
+    }
+
+    public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+        Configuration conf = HBaseConfiguration.create();
+        String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+        CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+        Long ts = BulkLoadOptions.getTimestamp(cli);
+        String table = BulkLoadOptions.TABLE.get(cli);
+        String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+        String accessTrackerTable = BulkLoadOptions.ACCESS_TABLE.get(cli);
+        String accessTrackerCF = BulkLoadOptions.ACCESS_COLUMN_FAMILY.get(cli);
+        Job job = createJob(conf, table, cf, accessTrackerTable, accessTrackerCF, ts);
+        System.exit(job.waitForCompletion(true) ? 0 : 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
new file mode 100644
index 0000000..7d7ef98
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.io.Files;
+import org.apache.commons.cli.*;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
+import org.apache.metron.hbase.converters.HbaseConverter;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.text.*;
+import java.util.Date;
+
+public class ThreatIntelBulkLoader  {
+    private static abstract class OptionHandler implements Function<String, Option> {}
+    private enum BulkLoadOptions {
+        HELP("h", new OptionHandler() {
+
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                return new Option(s, "help", false, "Generate Help screen");
+            }
+        })
+        ,TABLE("t", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "table", true, "HBase table to import data into");
+                o.setRequired(true);
+                o.setArgName("HBASE_TABLE");
+                return o;
+            }
+        })
+        ,COLUMN_FAMILY("f", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "column_family", true, "Column family of the HBase table to import into");
+                o.setRequired(true);
+                o.setArgName("CF_NAME");
+                return o;
+            }
+        })
+        ,EXTRACTOR_CONFIG("e", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+                o.setArgName("JSON_FILE");
+                o.setRequired(true);
+                return o;
+            }
+        })
+        ,INPUT_DATA("i", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "input", true, "Input directory in HDFS for the data to import into HBase");
+                o.setArgName("DIR");
+                o.setRequired(true);
+                return o;
+            }
+        })
+        ,AS_OF_TIME("a", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "as_of", true, "The last read timestamp to mark the records with (omit for time of execution)");
+                o.setArgName("datetime");
+                o.setRequired(false);
+                return o;
+            }
+        })
+        ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option)");
+                o.setArgName("format");
+                o.setRequired(false);
+                return o;
+            }
+        })
+        ,CONVERTER("c", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "converter", true, "The HBase converter class to use (Default is threat intel)");
+                o.setArgName("class");
+                o.setRequired(false);
+                return o;
+            }
+        })
+        ;
+        Option option;
+        String shortCode;
+        BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+            this.shortCode = shortCode;
+            this.option = optionHandler.apply(shortCode);
+        }
+
+        public boolean has(CommandLine cli) {
+            return cli.hasOption(shortCode);
+        }
+
+        public String get(CommandLine cli) {
+            return cli.getOptionValue(shortCode);
+        }
+
+        public static CommandLine parse(CommandLineParser parser, String[] args) {
+            try {
+                CommandLine cli = parser.parse(getOptions(), args);
+                if(ThreatIntelBulkLoader.BulkLoadOptions.HELP.has(cli)) {
+                    printHelp();
+                    System.exit(0);
+                }
+                return cli;
+            } catch (ParseException e) {
+                System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+                e.printStackTrace(System.err);
+                printHelp();
+                System.exit(-1);
+                return null;
+            }
+        }
+
+        public static void printHelp() {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
+        }
+
+        public static Options getOptions() {
+            Options ret = new Options();
+            for(BulkLoadOptions o : BulkLoadOptions.values()) {
+               ret.addOption(o.option);
+            }
+            return ret;
+        }
+    }
+    private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+        if(BulkLoadOptions.AS_OF_TIME.has(cli)) {
+            if(!BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+                throw new IllegalStateException("Unable to proceed: Specified as_of_time without an associated format.");
+            }
+            else {
+                DateFormat format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+                Date d = format.parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+                return d.getTime();
+            }
+        }
+        else {
+            return System.currentTimeMillis();
+        }
+    }
+    private static String readExtractorConfig(File configFile) throws IOException {
+        return Joiner.on("\n").join(Files.readLines(configFile, Charset.defaultCharset()));
+    }
+
+    public static Job createJob(Configuration conf, String input, String table, String cf, String extractorConfigContents, long ts, HbaseConverter converter) throws IOException {
+        Job job = new Job(conf);
+        job.setJobName("ThreatIntelBulkLoader: " + input + " => " +  table + ":" + cf);
+        System.out.println("Configuring " + job.getJobName());
+        job.setJarByClass(ThreatIntelBulkLoader.class);
+        job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
+        job.setOutputFormatClass(TableOutputFormat.class);
+        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
+        job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
+        job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
+        job.getConfiguration().set(BulkLoadMapper.LAST_SEEN_KEY, "" + ts);
+        job.getConfiguration().set(BulkLoadMapper.CONVERTER_KEY, converter.getClass().getName());
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(Put.class);
+        job.setNumReduceTasks(0);
+        ExtractorHandler handler = ExtractorHandler.load(extractorConfigContents);
+        handler.getInputFormatHandler().set(job, new Path(input), handler.getConfig());
+        return job;
+    }
+
+    public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException, IllegalAccessException, InstantiationException {
+        Configuration conf = HBaseConfiguration.create();
+        String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+        CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+        Long ts = getTimestamp(cli);
+        String input = BulkLoadOptions.INPUT_DATA.get(cli);
+        String table = BulkLoadOptions.TABLE.get(cli);
+        String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+        String extractorConfigContents = readExtractorConfig(new File(BulkLoadOptions.EXTRACTOR_CONFIG.get(cli)));
+        String converterClass = ThreatIntelConverter.class.getName();
+        if(BulkLoadOptions.CONVERTER.has(cli)) {
+            converterClass = BulkLoadOptions.CONVERTER.get(cli);
+        }
+        HbaseConverter converter = (HbaseConverter) Class.forName(converterClass).newInstance();
+        Job job = createJob(conf, input, table, cf, extractorConfigContents, ts, converter);
+        System.out.println(conf);
+        System.exit(job.waitForCompletion(true) ? 0 : 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
index 41f667b..30e56d8 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
@@ -17,12 +17,14 @@
  */
 package org.apache.metron.dataloads.extractor;
 
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.reference.lookup.LookupKV;
 import org.apache.metron.threatintel.ThreatIntelResults;
 
 import java.io.IOException;
 import java.util.Map;
 
 public interface Extractor {
-    Iterable<ThreatIntelResults> extract(String line) throws IOException;
+    Iterable<LookupKV> extract(String line) throws IOException;
     void initialize(Map<String, Object> config);
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
index f5ec434..94e9ccb 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
@@ -21,7 +21,12 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import com.opencsv.CSVParser;
 import com.opencsv.CSVParserBuilder;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.hbase.converters.HbaseConverter;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupValue;
 import org.apache.metron.threatintel.ThreatIntelResults;
 
 import java.io.IOException;
@@ -31,24 +36,26 @@ public class CSVExtractor implements Extractor {
     public static final String COLUMNS_KEY="columns";
     public static final String INDICATOR_COLUMN_KEY="indicator_column";
     public static final String SEPARATOR_KEY="separator";
+    public static final String LOOKUP_CONVERTER = "lookupConverter";
 
     private int indicatorColumn;
     private Map<String, Integer> columnMap = new HashMap<>();
     private CSVParser parser;
+    private LookupConverter converter = LookupConverters.THREAT_INTEL.getConverter();
 
     @Override
-    public Iterable<ThreatIntelResults> extract(String line) throws IOException {
+    public Iterable<LookupKV> extract(String line) throws IOException {
         if(line.trim().startsWith("#")) {
             //comment
             return Collections.emptyList();
         }
-        ThreatIntelResults ret = new ThreatIntelResults();
         String[] tokens = parser.parseLine(line);
-        ret.getKey().indicator = tokens[indicatorColumn];
+        LookupKey key = converter.toKey(tokens[indicatorColumn]);
+        Map<String, String> values = new HashMap<>();
         for(Map.Entry<String, Integer> kv : columnMap.entrySet()) {
-            ret.getValue().put(kv.getKey(), tokens[kv.getValue()]);
+            values.put(kv.getKey(), tokens[kv.getValue()]);
         }
-        return Arrays.asList(ret);
+        return Arrays.asList(new LookupKV(key, converter.toValue(values)));
     }
 
     private static Map.Entry<String, Integer> getColumnMapEntry(String column, int i) {
@@ -109,5 +116,8 @@ public class CSVExtractor implements Extractor {
             parser = new CSVParserBuilder().withSeparator(separator)
                                            .build();
         }
+        if(config.containsKey(LOOKUP_CONVERTER)) {
+           converter = LookupConverters.getConverter((String) config.get(LOOKUP_CONVERTER));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
new file mode 100644
index 0000000..9e9b79f
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.extractor.csv;
+
+import org.apache.metron.hbase.converters.HbaseConverter;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupValue;
+
+import java.util.Map;
+
+public interface LookupConverter {
+    LookupKey toKey(String indicator);
+    LookupValue toValue(Map<String, String> metadata);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
new file mode 100644
index 0000000..7f9218a
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.extractor.csv;
+
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupValue;
+
+import java.util.Map;
+
+public enum LookupConverters {
+
+    THREAT_INTEL(new LookupConverter() {
+        @Override
+        public LookupKey toKey(String indicator) {
+            return new ThreatIntelKey(indicator);
+        }
+
+        @Override
+        public LookupValue toValue(Map<String, String> metadata) {
+            return new ThreatIntelValue(metadata);
+        }
+    })
+    ;
+    LookupConverter converter;
+    LookupConverters(LookupConverter converter) {
+        this.converter = converter;
+    }
+    public LookupConverter getConverter() {
+        return converter;
+    }
+
+    public static LookupConverter getConverter(String name) {
+        try {
+            return LookupConverters.valueOf(name).getConverter();
+        }
+        catch(Throwable t) {
+            try {
+                return (LookupConverter) Class.forName(name).newInstance();
+            } catch (InstantiationException e) {
+                throw new IllegalStateException("Unable to parse " + name, e);
+            } catch (IllegalAccessException e) {
+                throw new IllegalStateException("Unable to parse " + name, e);
+            } catch (ClassNotFoundException e) {
+                throw new IllegalStateException("Unable to parse " + name, e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
index 039092f..b3829b4 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
@@ -23,9 +23,12 @@ import org.apache.commons.io.FileUtils;
 import org.apache.metron.dataloads.extractor.Extractor;
 import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandler;
 import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandlers;
+import org.apache.metron.reference.lookup.LookupKV;
 import org.apache.metron.threatintel.ThreatIntelResults;
 import org.mitre.cybox.common_2.*;
 import org.mitre.cybox.cybox_2.ObjectType;
+import org.mitre.cybox.cybox_2.Observable;
+import org.mitre.cybox.cybox_2.Observables;
 import org.mitre.stix.common_1.IndicatorBaseType;
 import org.mitre.stix.indicator_2.Indicator;
 import org.mitre.stix.stix_1.STIXPackage;
@@ -39,9 +42,35 @@ import java.util.Map;
 public class StixExtractor implements Extractor {
     Map<String, Object> config;
     @Override
-    public Iterable<ThreatIntelResults> extract(String line) throws IOException {
-        STIXPackage stixPackage = STIXPackage.fromXMLString(line);
-        List<ThreatIntelResults> ret = new ArrayList<>();
+    public Iterable<LookupKV> extract(String line) throws IOException {
+        STIXPackage stixPackage = STIXPackage.fromXMLString(line.replaceAll("\"Equal\"", "\"Equals\""));
+        List<LookupKV> ret = new ArrayList<>();
+        for(Observable o : getObservables(stixPackage)) {
+            ObjectType obj = o.getObject();
+            if(obj != null) {
+                ObjectPropertiesType props = obj.getProperties();
+                if(props != null) {
+                    ObjectTypeHandler handler = ObjectTypeHandlers.getHandlerByInstance(props);
+                    if (handler != null) {
+                        Iterable<LookupKV> extractions = handler.extract(props, config);
+                        for(LookupKV extraction : extractions) {
+                            ret.add(extraction);
+                        }
+                    }
+                }
+            }
+        }
+        return ret;
+    }
+
+    public List<Observable> getObservables(STIXPackage stixPackage) {
+        List<Observable> ret = new ArrayList<>();
+        Observables observables = stixPackage.getObservables();
+        if(observables != null) {
+            for (Observable o : observables.getObservables()) {
+                ret.add(o);
+            }
+        }
         if (stixPackage.getIndicators() != null) {
             if (stixPackage.getIndicators().getIndicators() != null) {
                 List<IndicatorBaseType> indicators = stixPackage.getIndicators().getIndicators();
@@ -49,12 +78,7 @@ public class StixExtractor implements Extractor {
                 for (int i = 0; i < indicatorCount; i++) {
                     Indicator indicator = (Indicator) indicators.get(i);
                     if (indicator.getObservable() != null) {
-                        ObjectType obj = indicator.getObservable().getObject();
-                        ObjectPropertiesType props = obj.getProperties();
-                        ObjectTypeHandler handler = ObjectTypeHandlers.getHandlerByInstance(props);
-                        if(handler != null) {
-                            Iterables.addAll(ret, handler.extract(props, config));
-                        }
+                        ret.add(indicator.getObservable());
                     }
                 }
             }
@@ -102,7 +126,7 @@ public class StixExtractor implements Extractor {
 
         String line = FileUtils.readFileToString(file);
         StixExtractor extractor = new StixExtractor();
-        for(ThreatIntelResults results : extractor.extract(line)) {
+        for(LookupKV results : extractor.extract(line)) {
             System.out.println(results);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
index ee4ff78..638a9ce 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
@@ -19,10 +19,10 @@ package org.apache.metron.dataloads.extractor.stix.types;
 
 import com.google.common.base.Splitter;
 import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.reference.lookup.LookupKV;
 import org.apache.metron.threatintel.ThreatIntelResults;
-import org.mitre.cybox.common_2.ConditionApplicationEnum;
-import org.mitre.cybox.common_2.ConditionTypeEnum;
 import org.mitre.cybox.common_2.StringObjectPropertyType;
 import org.mitre.cybox.objects.Address;
 import org.mitre.cybox.objects.CategoryTypeEnum;
@@ -42,8 +42,8 @@ public class AddressHandler extends AbstractObjectTypeHandler<Address> {
     }
 
     @Override
-    public Iterable<ThreatIntelResults> extract(final Address type, Map<String, Object> config) throws IOException {
-        List<ThreatIntelResults> ret = new ArrayList<>();
+    public Iterable<LookupKV> extract(final Address type, Map<String, Object> config) throws IOException {
+        List<LookupKV> ret = new ArrayList<>();
         final CategoryTypeEnum category= type.getCategory();
         if(!SUPPORTED_CATEGORIES.contains(category)) {
            return ret;
@@ -61,13 +61,15 @@ public class AddressHandler extends AbstractObjectTypeHandler<Address> {
         }
         StringObjectPropertyType value = type.getAddressValue();
         for(String token : StixExtractor.split(value)) {
-            ThreatIntelResults results = new ThreatIntelResults(new ThreatIntelKey(token),
+            LookupKV results = new LookupKV(new ThreatIntelKey(token)
+                                           , new ThreatIntelValue(
                                                                     new HashMap<String, String>() {{
                                                                         put("source-type", "STIX");
-                                                                        put("indicator-type", "Address");
+                                                                        put("indicator-type", type.getClass().getSimpleName() + ":" + category);
                                                                         put("source", type.toXMLString());
                                                                     }}
-                                                                   );
+                                                                 )
+                                           );
                 ret.add(results);
         }
         return ret;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
index e0444fb..233550b 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
@@ -18,7 +18,9 @@
 package org.apache.metron.dataloads.extractor.stix.types;
 
 import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.reference.lookup.LookupKV;
 import org.apache.metron.threatintel.ThreatIntelResults;
 import org.mitre.cybox.common_2.StringObjectPropertyType;
 import org.mitre.cybox.objects.DomainName;
@@ -34,19 +36,21 @@ public class DomainHandler extends AbstractObjectTypeHandler<DomainName> {
     }
 
     @Override
-    public Iterable<ThreatIntelResults> extract(final DomainName type, Map<String, Object> config) throws IOException {
-        List<ThreatIntelResults> ret = new ArrayList<>();
+    public Iterable<LookupKV> extract(final DomainName type, Map<String, Object> config) throws IOException {
+        List<LookupKV> ret = new ArrayList<>();
         final DomainNameTypeEnum domainType = type.getType();
-        if(SUPPORTED_TYPES.contains(domainType)) {
+        if(domainType == null || SUPPORTED_TYPES.contains(domainType)) {
             StringObjectPropertyType value = type.getValue();
             for (String token : StixExtractor.split(value)) {
-                ThreatIntelResults results = new ThreatIntelResults(new ThreatIntelKey(token),
-                        new HashMap<String, String>() {{
-                            put("source-type", "STIX");
-                            put("indicator-type", "DomainName");
-                            put("source", type.toXMLString());
-                        }}
-                );
+                LookupKV results = new LookupKV(new ThreatIntelKey(token)
+                                               , new ThreatIntelValue(
+                                                    new HashMap<String, String>() {{
+                                                        put("source-type", "STIX");
+                                                        put("indicator-type", type.getClass().getSimpleName() + ":" + DomainNameTypeEnum.FQDN);
+                                                        put("source", type.toXMLString());
+                                                    }}
+                                                                    )
+                                               );
                 ret.add(results);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
index d0c55a9..702c440 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
@@ -19,7 +19,9 @@
 package org.apache.metron.dataloads.extractor.stix.types;
 
 import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.reference.lookup.LookupKV;
 import org.apache.metron.threatintel.ThreatIntelResults;
 import org.mitre.cybox.common_2.StringObjectPropertyType;
 import org.mitre.cybox.objects.Hostname;
@@ -36,17 +38,18 @@ public class HostnameHandler  extends AbstractObjectTypeHandler<Hostname>{
     }
 
     @Override
-    public Iterable<ThreatIntelResults> extract(final Hostname type, Map<String, Object> config) throws IOException {
+    public Iterable<LookupKV> extract(final Hostname type, Map<String, Object> config) throws IOException {
         StringObjectPropertyType value = type.getHostnameValue();
-        List<ThreatIntelResults> ret = new ArrayList<>();
+        List<LookupKV> ret = new ArrayList<>();
         for(String token : StixExtractor.split(value)) {
-            ThreatIntelResults results = new ThreatIntelResults(new ThreatIntelKey(token),
-                                                                    new HashMap<String, String>() {{
+            LookupKV results = new LookupKV(new ThreatIntelKey(token)
+                                           , new ThreatIntelValue(new HashMap<String, String>() {{
                                                                         put("source-type", "STIX");
-                                                                        put("indicator-type", "Hostname");
+                                                                        put("indicator-type", type.getClass().getSimpleName());
                                                                         put("source", type.toXMLString());
                                                                     }}
-                                                                   );
+                                                                 )
+                                           );
                 ret.add(results);
         }
         return ret;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
index f639be0..e5a5296 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
@@ -18,6 +18,7 @@
 package org.apache.metron.dataloads.extractor.stix.types;
 
 import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.reference.lookup.LookupKV;
 import org.apache.metron.threatintel.ThreatIntelResults;
 import org.mitre.cybox.common_2.ObjectPropertiesType;
 
@@ -25,6 +26,6 @@ import java.io.IOException;
 import java.util.Map;
 
 public interface ObjectTypeHandler<T extends ObjectPropertiesType> {
-    Iterable<ThreatIntelResults> extract(T type, Map<String, Object> config) throws IOException;
+    Iterable<LookupKV> extract(T type, Map<String, Object> config) throws IOException;
     Class<T> getTypeClass();
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
index 0ad09e6..04714d9 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
@@ -24,8 +24,10 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.metron.dataloads.extractor.Extractor;
 import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.hbase.converters.HbaseConverter;
+import org.apache.metron.reference.lookup.LookupKV;
 import org.apache.metron.threatintel.ThreatIntelResults;
-import org.apache.metron.threatintel.hbase.Converter;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
 
 import java.io.IOException;
 
@@ -34,9 +36,11 @@ public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable,
     public static final String CONFIG_KEY="bl_extractor_config";
     public static final String COLUMN_FAMILY_KEY = "bl_column_family";
     public static final String LAST_SEEN_KEY = "bl_last_seen";
+    public static final String CONVERTER_KEY = "bl_converter";
     Extractor extractor = null;
     String columnFamily = null;
     Long lastSeen = null;
+    HbaseConverter converter;
     @Override
     public void setup(Context context) throws IOException,
             InterruptedException {
@@ -45,19 +49,28 @@ public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable,
 
     @Override
     public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
-        for(ThreatIntelResults results : extractor.extract(value.toString())) {
+        for(LookupKV results : extractor.extract(value.toString())) {
             if (results != null) {
-                Put put = Converter.INSTANCE.toPut(columnFamily, results.getKey(), results.getValue(), lastSeen);
+                Put put = converter.toPut(columnFamily, results.getKey(), results.getValue());
                 write(new ImmutableBytesWritable(results.getKey().toBytes()), put, context);
             }
         }
     }
 
-    protected void initialize(Configuration configuration) throws IOException {
+    protected void initialize(Configuration configuration) throws IOException{
         String configStr = configuration.get(CONFIG_KEY);
         extractor = ExtractorHandler.load(configStr).getExtractor();
         columnFamily = configuration.get(COLUMN_FAMILY_KEY);
         lastSeen = Long.parseLong(configuration.get(LAST_SEEN_KEY));
+        try {
+            converter = (HbaseConverter) Class.forName(configuration.get(CONVERTER_KEY)).newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalStateException("Unable to create converter object: " + configuration.get(CONVERTER_KEY), e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException("Unable to create converter object: " + configuration.get(CONVERTER_KEY), e);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalStateException("Unable to create converter object: " + configuration.get(CONVERTER_KEY), e);
+        }
     }
 
     protected void write(ImmutableBytesWritable key, Put value, Context context) throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
index 3e19b07..bf33eed 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
@@ -70,6 +70,11 @@ public class PrunerMapper extends TableMapper<ImmutableBytesWritable, Delete> {
             public byte[] toBytes() {
                 return bytes;
             }
+
+            @Override
+            public void fromBytes(byte[] in) {
+
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java
new file mode 100644
index 0000000..1e95507
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.taxii;
+
+public enum ConnectionType {
+   POLL, DISCOVER;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java
new file mode 100644
index 0000000..ddf542e
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.taxii;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+public class TableInfo {
+    private String tableName;
+    private String columnFamily;
+    public TableInfo(String s) {
+        Iterable<String> i = Splitter.on(":").split(s);
+        if(Iterables.size(i) != 2) {
+            throw new IllegalStateException("Malformed table:cf => " + s);
+        }
+        tableName = Iterables.getFirst(i, null);
+        columnFamily = Iterables.getLast(i);
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getColumnFamily() {
+        return columnFamily;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TableInfo tableInfo = (TableInfo) o;
+
+        if (getTableName() != null ? !getTableName().equals(tableInfo.getTableName()) : tableInfo.getTableName() != null)
+            return false;
+        return getColumnFamily() != null ? getColumnFamily().equals(tableInfo.getColumnFamily()) : tableInfo.getColumnFamily() == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = getTableName() != null ? getTableName().hashCode() : 0;
+        result = 31 * result + (getColumnFamily() != null ? getColumnFamily().hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "TableInfo{" +
+                "tableName='" + tableName + '\'' +
+                ", columnFamily='" + columnFamily + '\'' +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java
new file mode 100644
index 0000000..dab8f0c
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.taxii;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TaxiiConnectionConfig {
+    final static ObjectMapper _mapper = new ObjectMapper();
+    private URL endpoint;
+    private int port = 443;
+    private URL proxy;
+    private String username;
+    private String password;
+    private ConnectionType type;
+    private String collection = "default";
+    private String subscriptionId = null;
+    private Date beginTime;
+    private Map<String, TableInfo> tableMap;
+    public TaxiiConnectionConfig withTableMap(Map<String, TableInfo> tableMap) {
+        this.tableMap = tableMap;
+        return this;
+    }
+    public TaxiiConnectionConfig withBeginTime(Date time) {
+        this.beginTime = time;
+        return this;
+    }
+    public TaxiiConnectionConfig withSubscriptionId(String subId) {
+        this.subscriptionId = subId;
+        return this;
+    }
+    public TaxiiConnectionConfig withCollection(String collection) {
+        this.collection = collection;
+        return this;
+    }
+
+    public TaxiiConnectionConfig withPort(int port) {
+        this.port = port;
+        return this;
+    }
+    public TaxiiConnectionConfig withEndpoint(URL endpoint) {
+        this.endpoint = endpoint;
+        return this;
+    }
+    public TaxiiConnectionConfig withProxy(URL proxy) {
+        this.proxy = proxy;
+        return this;
+    }
+    public TaxiiConnectionConfig withUsername(String username) {
+        this.username = username;
+        return this;
+    }
+    public TaxiiConnectionConfig withPassword(String password) {
+        this.password = password;
+        return this;
+    }
+    public TaxiiConnectionConfig withConnectionType(ConnectionType type) {
+        this.type= type;
+        return this;
+    }
+
+    public void setEndpoint(String endpoint) throws MalformedURLException {
+        this.endpoint = new URL(endpoint);
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public void setProxy(String proxy) throws MalformedURLException {
+        this.proxy = new URL(proxy);
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public void setType(ConnectionType type) {
+        this.type = type;
+    }
+
+    public void setCollection(String collection) {
+        this.collection = collection;
+    }
+
+    public void setSubscriptionId(String subscriptionId) {
+        this.subscriptionId = subscriptionId;
+    }
+
+    public void setBeginTime(String beginTime) throws ParseException {
+        SimpleDateFormat sdf = (SimpleDateFormat)DateFormat.getDateInstance(DateFormat.MEDIUM);
+        this.beginTime = sdf.parse(beginTime);
+    }
+
+    public void setTableMap(Map<String, String> tableMap) {
+        this.tableMap = new HashMap<>();
+        for(Map.Entry<String, String> kv : tableMap.entrySet()) {
+            this.tableMap.put(kv.getKey(), new TableInfo(kv.getValue()));
+        }
+    }
+
+    public Map<String, TableInfo> getTableMap() {
+        return tableMap;
+    }
+
+    public Date getBeginTime() {
+        return beginTime;
+    }
+    public int getPort() {
+        return port;
+    }
+    public URL getEndpoint() {
+        return endpoint;
+    }
+
+    public URL getProxy() {
+        return proxy;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public ConnectionType getType() {
+        return type;
+    }
+
+    public String getCollection() {
+        return collection;
+    }
+    public String getSubscriptionId() {
+        return subscriptionId;
+    }
+    public static synchronized TaxiiConnectionConfig load(InputStream is) throws IOException {
+        TaxiiConnectionConfig ret = _mapper.readValue(is, TaxiiConnectionConfig.class);
+        return ret;
+    }
+    public static synchronized TaxiiConnectionConfig load(String s, Charset c) throws IOException {
+        return load( new ByteArrayInputStream(s.getBytes(c)));
+    }
+    public static synchronized TaxiiConnectionConfig load(String s) throws IOException {
+        return load( s, Charset.defaultCharset());
+    }
+
+    @Override
+    public String toString() {
+        return "TaxiiConnectionConfig{" +
+                "endpoint=" + endpoint +
+                ", port=" + port +
+                ", proxy=" + proxy +
+                ", username='" + username + '\'' +
+                ", password=" + (password == null?"null" : "'******'") +
+                ", type=" + type +
+                ", collection='" + collection + '\'' +
+                ", subscriptionId='" + subscriptionId + '\'' +
+                ", beginTime=" + beginTime +
+                ", tableMap=" + tableMap +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiHandler.java
new file mode 100644
index 0000000..614adec
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiHandler.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.taxii;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.AuthCache;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLContextBuilder;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.client.BasicAuthCache;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.log4j.Logger;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.mitre.taxii.client.HttpClient;
+import org.mitre.taxii.messages.xml11.*;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class TaxiiHandler extends TimerTask {
+    private static final Logger LOG = Logger.getLogger(TaxiiHandler.class);
+
+    private static ThreadLocal<TaxiiXmlFactory> xmlFactory = new ThreadLocal<TaxiiXmlFactory>() {
+        @Override
+        protected TaxiiXmlFactory initialValue() {
+            return new TaxiiXmlFactory();
+        }
+    };
+    private static ThreadLocal<ObjectFactory> messageFactory = new ThreadLocal<ObjectFactory>() {
+        @Override
+        protected ObjectFactory initialValue() {
+            return new ObjectFactory();
+        }
+    };
+
+    private HttpClient taxiiClient;
+    private URL endpoint;
+    private Extractor extractor;
+    private Map<String, TableInfo> tableMap;
+    private Map<TableInfo, HTableInterface> connectionCache = new HashMap<>();
+    private HttpClientContext context;
+    private String collection;
+    private String subscriptionId;
+    private ThreatIntelConverter converter = new ThreatIntelConverter();
+    private Date beginTime;
+    private Configuration config;
+    private boolean inProgress = false;
+    public TaxiiHandler( TaxiiConnectionConfig connectionConfig
+                       , Extractor extractor
+                       , Configuration config
+                       ) throws Exception
+    {
+        LOG.info("Loading configuration: " + connectionConfig);
+        this.extractor = extractor;
+        this.collection = connectionConfig.getCollection();
+        this.subscriptionId = connectionConfig.getSubscriptionId();
+        this.tableMap = connectionConfig.getTableMap();
+        this.beginTime = connectionConfig.getBeginTime();
+        this.config = config;
+        initializeClient(connectionConfig);
+        LOG.info("Configured, starting polling " + endpoint + " for " + collection);
+    }
+
+    protected synchronized HTableInterface getTable(TableInfo tableInfo) throws IOException {
+        HTableInterface ret = connectionCache.get(tableInfo);
+        if(ret == null) {
+            ret = createHTable(tableInfo);
+            connectionCache.put(tableInfo, ret);
+        }
+        return ret;
+    }
+
+    protected synchronized HTableInterface createHTable(TableInfo tableInfo) throws IOException {
+        return new HTable(config, tableInfo.getTableName());
+    }
+    /**
+     * The action to be performed by this timer task.
+     */
+    @Override
+    public void run() {
+        if(inProgress) {
+            return;
+        }
+        Date ts = new Date();
+        LOG.info("Polling..." + new SimpleDateFormat().format(ts));
+        try {
+            inProgress = true;
+            // Prepare the message to send.
+            String sessionID = MessageHelper.generateMessageId();
+            PollRequest request = messageFactory.get().createPollRequest()
+                    .withMessageId(sessionID)
+                    .withCollectionName(collection);
+            if (subscriptionId != null) {
+                request = request.withSubscriptionID(subscriptionId);
+            } else {
+                request = request.withPollParameters(messageFactory.get().createPollParametersType());
+            }
+            if (beginTime != null) {
+                Calendar gc = GregorianCalendar.getInstance();
+                gc.setTime(beginTime);
+                XMLGregorianCalendar gTime = null;
+                try {
+                    gTime = DatatypeFactory.newInstance().newXMLGregorianCalendar((GregorianCalendar) gc).normalize();
+                } catch (DatatypeConfigurationException e) {
+                    LOG.error("Unable to set the begin time", e);
+                }
+                gTime.setFractionalSecond(null);
+                LOG.info("Begin Time: " + gTime);
+                request.setExclusiveBeginTimestamp(gTime);
+            }
+
+            try {
+                PollResponse response = call(request, PollResponse.class);
+                LOG.info("Got Poll Response with " + response.getContentBlocks().size() + " blocks");
+                int numProcessed = 0;
+                long avgTimeMS = 0;
+                long timeStartedBlock = System.currentTimeMillis();
+                for (ContentBlock block : response.getContentBlocks()) {
+                    AnyMixedContentType content = block.getContent();
+                    for (Object o : content.getContent()) {
+                        numProcessed++;
+                        long timeS = System.currentTimeMillis();
+                        String xml = null;
+                        if (o instanceof Element) {
+                            Element element = (Element) o;
+                            xml = getStringFromDocument(element.getOwnerDocument());
+                            if(LOG.isDebugEnabled() && Math.random() < 0.01) {
+                                LOG.debug("Random Stix doc: " + xml);
+                            }
+                            for (LookupKV<ThreatIntelKey, ThreatIntelValue> kv : extractor.extract(xml)) {
+                                String indicatorType = kv.getValue().getMetadata().get("indicator-type");
+                                TableInfo tableInfo = tableMap.get(indicatorType);
+                                boolean persisted = false;
+                                if (tableInfo != null) {
+                                    kv.getValue().getMetadata().put("source_type", "taxii");
+                                    kv.getValue().getMetadata().put("taxii_url", endpoint.toString());
+                                    kv.getValue().getMetadata().put("taxii_collection", collection);
+                                    Put p = converter.toPut(tableInfo.getColumnFamily(), kv.getKey(), kv.getValue());
+                                    HTableInterface table = getTable(tableInfo);
+                                    table.put(p);
+                                    persisted = true;
+                                }
+                                LOG.info("Found Threat Intel: " + persisted + ", " + kv.getKey() + " => " + kv.getValue());
+                            }
+                        }
+                        avgTimeMS += System.currentTimeMillis() - timeS;
+                    }
+                    if( (numProcessed + 1) % 100 == 0) {
+                        LOG.info("Processed " + numProcessed + " in " + (System.currentTimeMillis() - timeStartedBlock) + " ms, avg time: " + avgTimeMS / content.getContent().size());
+                        timeStartedBlock = System.currentTimeMillis();
+                        avgTimeMS = 0;
+                        numProcessed = 0;
+                    }
+                }
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+                throw new RuntimeException("Unable to make request", e);
+            }
+        }
+        finally {
+            inProgress = false;
+            beginTime = ts;
+        }
+    }
+    public String getStringFromDocument(Document doc)
+    {
+        try
+        {
+            DOMSource domSource = new DOMSource(doc);
+            StringWriter writer = new StringWriter();
+            StreamResult result = new StreamResult(writer);
+            TransformerFactory tf = TransformerFactory.newInstance();
+            Transformer transformer = tf.newTransformer();
+            transformer.transform(domSource, result);
+            return writer.toString();
+        }
+        catch(TransformerException ex)
+        {
+            ex.printStackTrace();
+            return null;
+        }
+    }
+    private <RESPONSE_T> RESPONSE_T call( Object request, Class<RESPONSE_T> responseClazz) throws URISyntaxException, JAXBException, IOException {
+        return call(taxiiClient, endpoint.toURI(), request, context, responseClazz);
+    }
+
+    private void initializeClient(TaxiiConnectionConfig config) throws Exception {
+        LOG.info("Initializing client..");
+        if(context == null) {
+            context = createContext(config.getEndpoint(), config.getUsername(), config.getPassword(), config.getPort());
+        }
+        URL endpoint = config.getEndpoint();
+        if(config.getType() == ConnectionType.DISCOVER) {
+            LOG.info("Discovering endpoint");
+            endpoint = discoverPollingClient(config.getProxy(), endpoint, config.getUsername(), config.getPassword(), context, collection).pollEndpoint;
+            this.endpoint = endpoint;
+            LOG.info("Discovered endpoint as " + endpoint);
+        }
+        taxiiClient = buildClient(config.getProxy(), config.getUsername(), config.getPassword());
+    }
+
+    private static class DiscoveryResults {
+        URL pollEndpoint;
+        URL collectionManagementEndpoint;
+        List<String> collections = new ArrayList<>();
+    }
+    private static DiscoveryResults discoverPollingClient(URL proxy, URL endpoint, String username, String password, HttpClientContext context, String defaultCollection) throws Exception {
+
+        DiscoveryResults results = new DiscoveryResults();
+        {
+            HttpClient discoverClient = buildClient(proxy, username, password);
+            String sessionID = MessageHelper.generateMessageId();
+            // Prepare the message to send.
+            DiscoveryRequest request = messageFactory.get().createDiscoveryRequest()
+                    .withMessageId(sessionID);
+            DiscoveryResponse response = call(discoverClient, endpoint.toURI(), request, context, DiscoveryResponse.class);
+            for (ServiceInstanceType serviceInstance : response.getServiceInstances()) {
+                if (serviceInstance.isAvailable() && serviceInstance.getServiceType() == ServiceTypeEnum.POLL) {
+                    results.pollEndpoint = new URL(serviceInstance.getAddress());
+                }
+                else if(serviceInstance.isAvailable() && serviceInstance.getServiceType() == ServiceTypeEnum.COLLECTION_MANAGEMENT) {
+                    results.collectionManagementEndpoint= new URL(serviceInstance.getAddress());
+                }
+            }
+            if (results.pollEndpoint == null) {
+                throw new RuntimeException("Unable to discover a poll TAXII feed");
+            }
+        }
+        if(defaultCollection == null)
+        //get collections
+        {
+            HttpClient discoverClient = buildClient(proxy, username, password);
+            String sessionID = MessageHelper.generateMessageId();
+            CollectionInformationRequest request = messageFactory.get().createCollectionInformationRequest()
+                                                                 .withMessageId(sessionID);
+            CollectionInformationResponse response = call(discoverClient, results.collectionManagementEndpoint.toURI(), request, context, CollectionInformationResponse.class);
+            LOG.info("Unable to find the default collection; available collections are:");
+            for(CollectionRecordType c : response.getCollections()) {
+                LOG.info(c.getCollectionName());
+                results.collections.add(c.getCollectionName());
+            }
+            System.exit(0);
+        }
+        return results;
+    }
+
+    private static HttpClientContext createContext(URL endpoint, String username, String password, int port) {
+        HttpClientContext context = null;
+        HttpHost target = new HttpHost(endpoint.getHost(), port, endpoint.getProtocol());
+        if (username != null && password != null) {
+
+            CredentialsProvider credsProvider = new BasicCredentialsProvider();
+            credsProvider.setCredentials(
+                    new AuthScope(target.getHostName(), target.getPort()),
+                    new UsernamePasswordCredentials(username, password));
+
+            // http://hc.apache.org/httpcomponents-client-ga/tutorial/html/authentication.html
+            AuthCache authCache = new BasicAuthCache();
+            authCache.put(target, new BasicScheme());
+
+            // Add AuthCache to the execution context
+            context = HttpClientContext.create();
+            context.setCredentialsProvider(credsProvider);
+            context.setAuthCache(authCache);
+        } else {
+            context = null;
+        }
+        return context;
+    }
+
+
+    public static <RESPONSE_T, REQUEST_T> RESPONSE_T call( HttpClient taxiiClient
+            , URI endpoint
+            , REQUEST_T request
+            , HttpClientContext context
+            , Class<RESPONSE_T> responseClazz
+    ) throws JAXBException, IOException {
+        //TaxiiXml taxiiXml = xmlFactory.get().createTaxiiXml();
+        //String req = taxiiXml.marshalToString(request, true);
+        // Call the service
+        Object responseObj =  taxiiClient.callTaxiiService(endpoint, request, context);
+        LOG.info("Request made : " + request.getClass().getCanonicalName() + " => " + responseObj.getClass().getCanonicalName() + " (expected " + responseClazz.getCanonicalName() + ")");
+        //String resp = taxiiXml.marshalToString(responseObj, true);
+        try {
+            return responseClazz.cast(responseObj);
+        }
+        catch(ClassCastException cce) {
+            TaxiiXml taxiiXml = xmlFactory.get().createTaxiiXml();
+            String resp = taxiiXml.marshalToString(responseObj, true);
+            String msg = "Didn't return the response we expected: " + responseObj.getClass() + " \n" + resp;
+            LOG.error(msg, cce);
+            throw new RuntimeException(msg, cce);
+        }
+    }
+    private static HttpClient buildClient(URL proxy, String username, String password) throws Exception
+    {
+        HttpClient client = new HttpClient(); // Start with a default TAXII HTTP client.
+
+        // Create an Apache HttpClientBuilder to be customized by the command line arguments.
+        HttpClientBuilder builder = HttpClientBuilder.create().useSystemProperties();
+
+        // Proxy
+        if (proxy != null) {
+            HttpHost proxyHost = new HttpHost(proxy.getHost(), proxy.getPort(), proxy.getProtocol());
+            builder.setProxy(proxyHost);
+        }
+
+        // Basic authentication. User & Password
+        if (username != null ^ password != null) {
+            throw new Exception("'username' and 'password' arguments are required to appear together.");
+        }
+
+
+        // from:  http://stackoverflow.com/questions/19517538/ignoring-ssl-certificate-in-apache-httpclient-4-3
+        SSLContextBuilder ssbldr = new SSLContextBuilder();
+        ssbldr.loadTrustMaterial(null, new TrustSelfSignedStrategy());
+        SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(ssbldr.build(),SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
+
+
+        Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
+                .register("http", new PlainConnectionSocketFactory())
+                .register("https", sslsf)
+                .build();
+
+
+        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registry);
+        cm.setMaxTotal(20);//max connection
+
+        System.setProperty("jsse.enableSNIExtension", "false"); //""
+        CloseableHttpClient httpClient = builder
+                .setSSLSocketFactory(sslsf)
+                .setConnectionManager(cm)
+                .build();
+
+        client.setHttpclient(httpClient);
+        return client;
+    }
+    public static void main(String... argv) throws Exception {
+        URL endpoint = new URL("http://hailataxii.com/taxii-discovery-service");
+        String username = "guest";
+        String password = "guest";
+        TaxiiConnectionConfig config = new TaxiiConnectionConfig();
+        config = config.withConnectionType(ConnectionType.DISCOVER)
+                       .withEndpoint(endpoint)
+                       .withUsername(username)
+                       .withCollection("guest.Abuse_ch")
+                       .withPassword(password);
+        //TaxiiHandler handler = new TaxiiHandler(config, null);
+        //handler.run();
+        //discoverPollingClient(null, endpoint, username, password, context);
+    }
+}