You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/04/07 14:56:37 UTC

[4/6] incubator-metron git commit: METRON-93: Generalize the HBase threat intel infrastructure to support enrichments closes apache/incubator-metron#64

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
index 233550b..ae839d2 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
@@ -18,10 +18,9 @@
 package org.apache.metron.dataloads.extractor.stix.types;
 
 import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentValue;
 import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.threatintel.ThreatIntelResults;
 import org.mitre.cybox.common_2.StringObjectPropertyType;
 import org.mitre.cybox.objects.DomainName;
 import org.mitre.cybox.objects.DomainNameTypeEnum;
@@ -30,30 +29,49 @@ import java.io.IOException;
 import java.util.*;
 
 public class DomainHandler extends AbstractObjectTypeHandler<DomainName> {
-    EnumSet<DomainNameTypeEnum> SUPPORTED_TYPES = EnumSet.of(DomainNameTypeEnum.FQDN);
-    public DomainHandler() {
-        super(DomainName.class);
-    }
+  public static final String TYPE_CONFIG = "stix_domain_type";
+  EnumSet<DomainNameTypeEnum> SUPPORTED_TYPES = EnumSet.of(DomainNameTypeEnum.FQDN);
+  public DomainHandler() {
+    super(DomainName.class);
+  }
 
-    @Override
-    public Iterable<LookupKV> extract(final DomainName type, Map<String, Object> config) throws IOException {
-        List<LookupKV> ret = new ArrayList<>();
-        final DomainNameTypeEnum domainType = type.getType();
-        if(domainType == null || SUPPORTED_TYPES.contains(domainType)) {
-            StringObjectPropertyType value = type.getValue();
-            for (String token : StixExtractor.split(value)) {
-                LookupKV results = new LookupKV(new ThreatIntelKey(token)
-                                               , new ThreatIntelValue(
-                                                    new HashMap<String, String>() {{
-                                                        put("source-type", "STIX");
-                                                        put("indicator-type", type.getClass().getSimpleName() + ":" + DomainNameTypeEnum.FQDN);
-                                                        put("source", type.toXMLString());
-                                                    }}
-                                                                    )
-                                               );
-                ret.add(results);
-            }
-        }
-        return ret;
+  @Override
+  public Iterable<LookupKV> extract(final DomainName type, Map<String, Object> config) throws IOException {
+    List<LookupKV> ret = new ArrayList<>();
+    String typeStr = getType();
+    if(config != null) {
+      Object o = config.get(TYPE_CONFIG);
+      if(o != null) {
+        typeStr = o.toString();
+      }
+    }
+    final DomainNameTypeEnum domainType = type.getType();
+    if(domainType == null || SUPPORTED_TYPES.contains(domainType)) {
+      StringObjectPropertyType value = type.getValue();
+      for (String token : StixExtractor.split(value)) {
+        final String indicatorType = typeStr + ":" + DomainNameTypeEnum.FQDN;
+        LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
+                , new EnrichmentValue(
+                new HashMap<String, String>() {{
+                  put("source-type", "STIX");
+                  put("indicator-type", indicatorType);
+                  put("source", type.toXMLString());
+                }}
+        )
+        );
+        ret.add(results);
+      }
+    }
+    return ret;
+  }
+  @Override
+  public List<String> getPossibleTypes() {
+    String typeStr = getType();
+    List<String> ret = new ArrayList<>();
+    for(DomainNameTypeEnum e : SUPPORTED_TYPES)
+    {
+       ret.add(typeStr + ":" + e);
     }
+    return ret;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
index 702c440..ab02440 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
@@ -18,11 +18,11 @@
 
 package org.apache.metron.dataloads.extractor.stix.types;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentValue;
 import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.threatintel.ThreatIntelResults;
 import org.mitre.cybox.common_2.StringObjectPropertyType;
 import org.mitre.cybox.objects.Hostname;
 
@@ -33,25 +33,38 @@ import java.util.List;
 import java.util.Map;
 
 public class HostnameHandler  extends AbstractObjectTypeHandler<Hostname>{
-    public HostnameHandler() {
-        super(Hostname.class);
-    }
+  public static final String TYPE_CONFIG = "stix_hostname_type";
+  public HostnameHandler() {
+    super(Hostname.class);
+  }
 
-    @Override
-    public Iterable<LookupKV> extract(final Hostname type, Map<String, Object> config) throws IOException {
-        StringObjectPropertyType value = type.getHostnameValue();
-        List<LookupKV> ret = new ArrayList<>();
-        for(String token : StixExtractor.split(value)) {
-            LookupKV results = new LookupKV(new ThreatIntelKey(token)
-                                           , new ThreatIntelValue(new HashMap<String, String>() {{
-                                                                        put("source-type", "STIX");
-                                                                        put("indicator-type", type.getClass().getSimpleName());
-                                                                        put("source", type.toXMLString());
-                                                                    }}
-                                                                 )
-                                           );
-                ret.add(results);
-        }
-        return ret;
+  @Override
+  public Iterable<LookupKV> extract(final Hostname type, Map<String, Object> config) throws IOException {
+    StringObjectPropertyType value = type.getHostnameValue();
+    String typeStr = getType();
+    if(config != null) {
+      Object o = config.get(TYPE_CONFIG);
+      if(o != null) {
+        typeStr = o.toString();
+      }
+    }
+    List<LookupKV> ret = new ArrayList<>();
+    for(String token : StixExtractor.split(value)) {
+      final String indicatorType = typeStr;
+      LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
+              , new EnrichmentValue(new HashMap<String, String>() {{
+        put("source-type", "STIX");
+        put("indicator-type", indicatorType);
+        put("source", type.toXMLString());
+      }}
+      )
+      );
+      ret.add(results);
     }
+    return ret;
+  }
+  @Override
+  public List<String> getPossibleTypes() {
+    return ImmutableList.of(getType());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
index e5a5296..57d72ff 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
@@ -17,15 +17,15 @@
  */
 package org.apache.metron.dataloads.extractor.stix.types;
 
-import org.apache.metron.dataloads.extractor.Extractor;
 import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.threatintel.ThreatIntelResults;
 import org.mitre.cybox.common_2.ObjectPropertiesType;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 public interface ObjectTypeHandler<T extends ObjectPropertiesType> {
-    Iterable<LookupKV> extract(T type, Map<String, Object> config) throws IOException;
-    Class<T> getTypeClass();
+  Iterable<LookupKV> extract(T type, Map<String, Object> config) throws IOException;
+  Class<T> getTypeClass();
+  List<String> getPossibleTypes();
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
index 04714d9..5baa3a5 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
@@ -26,8 +26,6 @@ import org.apache.metron.dataloads.extractor.Extractor;
 import org.apache.metron.dataloads.extractor.ExtractorHandler;
 import org.apache.metron.hbase.converters.HbaseConverter;
 import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.threatintel.ThreatIntelResults;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
 
 import java.io.IOException;
 
@@ -39,7 +37,6 @@ public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable,
     public static final String CONVERTER_KEY = "bl_converter";
     Extractor extractor = null;
     String columnFamily = null;
-    Long lastSeen = null;
     HbaseConverter converter;
     @Override
     public void setup(Context context) throws IOException,
@@ -61,7 +58,6 @@ public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable,
         String configStr = configuration.get(CONFIG_KEY);
         extractor = ExtractorHandler.load(configStr).getExtractor();
         columnFamily = configuration.get(COLUMN_FAMILY_KEY);
-        lastSeen = Long.parseLong(configuration.get(LAST_SEEN_KEY));
         try {
             converter = (HbaseConverter) Class.forName(configuration.get(CONVERTER_KEY)).newInstance();
         } catch (InstantiationException e) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
index bf33eed..b418a80 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
@@ -48,7 +48,7 @@ public class PrunerMapper extends TableMapper<ImmutableBytesWritable, Delete> {
         }
         try {
             tracker = AccessTrackerUtil.INSTANCE.loadAll(AccessTrackerUtil.INSTANCE.loadAll(table, atCF, atName, timestamp));
-        } catch (Exception e) {
+        } catch (Throwable e) {
             throw new IllegalStateException("Unable to load the accesstrackers from the directory", e);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
new file mode 100644
index 0000000..816311f
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.cli.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
+import org.apache.metron.enrichment.EnrichmentConfig;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.converters.HbaseConverter;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentConverter;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentValue;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.apache.metron.utils.JSONUtils;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+public class SimpleEnrichmentFlatFileLoader {
+  private static abstract class OptionHandler implements Function<String, Option> {}
+  public static enum LoadOptions {
+    HELP("h", new OptionHandler() {
+
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        return new Option(s, "help", false, "Generate Help screen");
+      }
+    })
+    ,HBASE_TABLE("t", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "hbase_table", true, "HBase table to ingest the data into.");
+        o.setArgName("TABLE");
+        o.setRequired(true);
+        return o;
+      }
+    })
+    ,HBASE_CF("c", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "hbase_cf", true, "HBase column family to ingest the data into.");
+        o.setArgName("CF");
+        o.setRequired(true);
+        return o;
+      }
+    })
+    ,EXTRACTOR_CONFIG("e", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+        o.setArgName("JSON_FILE");
+        o.setRequired(true);
+        return o;
+      }
+    })
+    ,ENRICHMENT_CONFIG("n", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "enrichment_config", true
+                , "JSON Document describing the enrichment configuration details." +
+                "  This is used to associate an enrichment type with a field type in zookeeper."
+        );
+        o.setArgName("JSON_FILE");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ,LOG4J_PROPERTIES("l", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "log4j", true, "The log4j properties file to load");
+        o.setArgName("FILE");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ,INPUT("i", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "input", true, "The CSV File to load");
+        o.setArgName("FILE");
+        o.setRequired(true);
+        return o;
+      }
+    })
+    ;
+    Option option;
+    String shortCode;
+    LoadOptions(String shortCode, OptionHandler optionHandler) {
+      this.shortCode = shortCode;
+      this.option = optionHandler.apply(shortCode);
+    }
+
+    public boolean has(CommandLine cli) {
+      return cli.hasOption(shortCode);
+    }
+
+    public String get(CommandLine cli) {
+      return cli.getOptionValue(shortCode);
+    }
+
+    public static CommandLine parse(CommandLineParser parser, String[] args) {
+      try {
+        CommandLine cli = parser.parse(getOptions(), args);
+        if(HELP.has(cli)) {
+          printHelp();
+          System.exit(0);
+        }
+        return cli;
+      } catch (ParseException e) {
+        System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+        e.printStackTrace(System.err);
+        printHelp();
+        System.exit(-1);
+        return null;
+      }
+    }
+
+    public static void printHelp() {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp( "SimpleEnrichmentFlatFileLoader", getOptions());
+    }
+
+    public static Options getOptions() {
+      Options ret = new Options();
+      for(LoadOptions o : LoadOptions.values()) {
+        ret.addOption(o.option);
+      }
+      return ret;
+    }
+  }
+  public static List<File> getFiles(File root) {
+    if(!root.isDirectory())  {
+      return ImmutableList.of(root);
+    }
+    List<File> ret = new ArrayList<>();
+    Stack<File> stack = new Stack<File>();
+    stack.push(root);
+    while(!stack.isEmpty()) {
+      File f = stack.pop();
+      if(f.isDirectory()) {
+        for(File child : f.listFiles()) {
+          stack.push(child);
+        }
+      }
+      else {
+        ret.add(f);
+      }
+    }
+    return ret;
+  }
+
+  public HTableProvider getProvider() {
+    return new HTableProvider();
+  }
+
+  public List<Put> extract( String line
+                     , Extractor extractor
+                     , String cf
+                     , HbaseConverter converter
+                     ) throws IOException
+  {
+    List<Put> ret = new ArrayList<>();
+    Iterable<LookupKV> kvs = extractor.extract(line);
+    for(LookupKV kv : kvs) {
+      Put put = converter.toPut(cf, kv.getKey(), kv.getValue());
+      ret.add(put);
+    }
+    return ret;
+  }
+
+
+  public void loadFile( File inputFile
+                      , Extractor extractor
+                      , HTableInterface table
+                      , String cf
+                      , HbaseConverter converter
+                      , boolean lineByLine
+                      ) throws IOException
+  {
+    if(!lineByLine) {
+      table.put(extract(FileUtils.readFileToString(inputFile), extractor, cf, converter));
+    }
+    else {
+      BufferedReader br = new BufferedReader(new FileReader(inputFile));
+      for(String line = null;(line = br.readLine()) != null;) {
+        table.put(extract(line, extractor, cf, converter));
+      }
+    }
+  }
+  public static void main(String... argv) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+    CommandLine cli = LoadOptions.parse(new PosixParser(), otherArgs);
+    if(LoadOptions.LOG4J_PROPERTIES.has(cli)) {
+      PropertyConfigurator.configure(LoadOptions.LOG4J_PROPERTIES.get(cli));
+    }
+    ExtractorHandler handler = ExtractorHandler.load(
+            FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli)))
+    );
+    boolean lineByLine = !handler.getInputFormatHandler().getClass().equals(WholeFileFormat.class);
+    Extractor e = handler.getExtractor();
+    EnrichmentConfig enrichmentConfig = null;
+    if(LoadOptions.ENRICHMENT_CONFIG.has(cli)) {
+      enrichmentConfig = JSONUtils.INSTANCE.load( new File(LoadOptions.ENRICHMENT_CONFIG.get(cli))
+              , EnrichmentConfig.class
+      );
+    }
+    HbaseConverter converter = new EnrichmentConverter();
+    List<File> inputFiles = getFiles(new File(LoadOptions.INPUT.get(cli)));
+    SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader();
+    HTableInterface table = loader.getProvider()
+            .getTable(conf, LoadOptions.HBASE_TABLE.get(cli));
+
+    for (File f : inputFiles) {
+      loader.loadFile(f, e, table, LoadOptions.HBASE_CF.get(cli), converter, lineByLine);
+    }
+    if(enrichmentConfig != null) {
+      enrichmentConfig.updateSensorConfigs();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/ConnectionType.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/ConnectionType.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/ConnectionType.java
new file mode 100644
index 0000000..77d1698
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/ConnectionType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.taxii;
+
+public enum ConnectionType {
+   POLL, DISCOVER;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TableInfo.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TableInfo.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TableInfo.java
new file mode 100644
index 0000000..6bbf8e3
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TableInfo.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.taxii;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+public class TableInfo {
+    private String tableName;
+    private String columnFamily;
+    public TableInfo(String s) {
+        Iterable<String> i = Splitter.on(":").split(s);
+        if(Iterables.size(i) != 2) {
+            throw new IllegalStateException("Malformed table:cf => " + s);
+        }
+        tableName = Iterables.getFirst(i, null);
+        columnFamily = Iterables.getLast(i);
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getColumnFamily() {
+        return columnFamily;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TableInfo tableInfo = (TableInfo) o;
+
+        if (getTableName() != null ? !getTableName().equals(tableInfo.getTableName()) : tableInfo.getTableName() != null)
+            return false;
+        return getColumnFamily() != null ? getColumnFamily().equals(tableInfo.getColumnFamily()) : tableInfo.getColumnFamily() == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = getTableName() != null ? getTableName().hashCode() : 0;
+        result = 31 * result + (getColumnFamily() != null ? getColumnFamily().hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "TableInfo{" +
+                "tableName='" + tableName + '\'' +
+                ", columnFamily='" + columnFamily + '\'' +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiConnectionConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiConnectionConfig.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiConnectionConfig.java
new file mode 100644
index 0000000..678f98b
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiConnectionConfig.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.taxii;
+
+import com.google.common.base.Joiner;
+import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandlers;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class TaxiiConnectionConfig {
+  final static ObjectMapper _mapper = new ObjectMapper();
+  private URL endpoint;
+  private int port = 443;
+  private URL proxy;
+  private String username;
+  private String password;
+  private ConnectionType type;
+  private String collection = "default";
+  private String subscriptionId = null;
+  private Date beginTime;
+  private String table;
+  private String columnFamily;
+  private Set<String> allowedIndicatorTypes = new HashSet<String>();
+
+  public TaxiiConnectionConfig withAllowedIndicatorTypes(List<String> indicatorTypes) {
+    allowedIndicatorTypes = new HashSet(indicatorTypes);
+    return this;
+  }
+
+  public TaxiiConnectionConfig withTable(String table) {
+    this.table = table;
+    return this;
+  }
+  public TaxiiConnectionConfig withColumnFamily(String cf) {
+    this.columnFamily = cf;
+    return this;
+  }
+  public TaxiiConnectionConfig withBeginTime(Date time) {
+    this.beginTime = time;
+    return this;
+  }
+  public TaxiiConnectionConfig withSubscriptionId(String subId) {
+    this.subscriptionId = subId;
+    return this;
+  }
+  public TaxiiConnectionConfig withCollection(String collection) {
+    this.collection = collection;
+    return this;
+  }
+
+  public TaxiiConnectionConfig withPort(int port) {
+    this.port = port;
+    return this;
+  }
+  public TaxiiConnectionConfig withEndpoint(URL endpoint) {
+    this.endpoint = endpoint;
+    return this;
+  }
+  public TaxiiConnectionConfig withProxy(URL proxy) {
+    this.proxy = proxy;
+    return this;
+  }
+  public TaxiiConnectionConfig withUsername(String username) {
+    this.username = username;
+    return this;
+  }
+  public TaxiiConnectionConfig withPassword(String password) {
+    this.password = password;
+    return this;
+  }
+  public TaxiiConnectionConfig withConnectionType(ConnectionType type) {
+    this.type= type;
+    return this;
+  }
+
+  public void setEndpoint(String endpoint) throws MalformedURLException {
+    this.endpoint = new URL(endpoint);
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  public void setProxy(String proxy) throws MalformedURLException {
+    this.proxy = new URL(proxy);
+  }
+
+  public void setUsername(String username) {
+    this.username = username;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public void setType(ConnectionType type) {
+    this.type = type;
+  }
+
+  public void setCollection(String collection) {
+    this.collection = collection;
+  }
+
+  public void setSubscriptionId(String subscriptionId) {
+    this.subscriptionId = subscriptionId;
+  }
+
+  public void setBeginTime(String beginTime) throws ParseException {
+    SimpleDateFormat sdf = (SimpleDateFormat)DateFormat.getDateInstance(DateFormat.MEDIUM);
+    this.beginTime = sdf.parse(beginTime);
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public String getColumnFamily() {
+    return columnFamily;
+  }
+
+  public void setColumnFamily(String columnFamily) {
+    this.columnFamily = columnFamily;
+  }
+
+  public Date getBeginTime() {
+    return beginTime;
+  }
+  public int getPort() {
+    return port;
+  }
+  public URL getEndpoint() {
+    return endpoint;
+  }
+
+  public URL getProxy() {
+    return proxy;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public ConnectionType getType() {
+    return type;
+  }
+
+  public String getCollection() {
+    return collection;
+  }
+  public String getSubscriptionId() {
+    return subscriptionId;
+  }
+
+  public void setAllowedIndicatorTypes(List<String> allowedIndicatorTypes) {
+    withAllowedIndicatorTypes(allowedIndicatorTypes);
+  }
+
+  public Set<String> getAllowedIndicatorTypes() {
+    return allowedIndicatorTypes;
+  }
+  public static synchronized TaxiiConnectionConfig load(InputStream is) throws IOException {
+    TaxiiConnectionConfig ret = _mapper.readValue(is, TaxiiConnectionConfig.class);
+    return ret;
+  }
+  public static synchronized TaxiiConnectionConfig load(String s, Charset c) throws IOException {
+    return load( new ByteArrayInputStream(s.getBytes(c)));
+  }
+  public static synchronized TaxiiConnectionConfig load(String s) throws IOException {
+    return load( s, Charset.defaultCharset());
+  }
+
+  @Override
+  public String toString() {
+    return "TaxiiConnectionConfig{" +
+            "endpoint=" + endpoint +
+            ", port=" + port +
+            ", proxy=" + proxy +
+            ", username='" + username + '\'' +
+            ", password=" + (password == null?"null" : "'******'") +
+            ", type=" + type +
+            ", allowedIndicatorTypes=" + Joiner.on(',').join(allowedIndicatorTypes)+
+            ", collection='" + collection + '\'' +
+            ", subscriptionId='" + subscriptionId + '\'' +
+            ", beginTime=" + beginTime +
+            ", table=" + table + ":" + columnFamily+
+            '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java
new file mode 100644
index 0000000..1e45f94
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.taxii;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.AuthCache;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLContextBuilder;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.client.BasicAuthCache;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.log4j.Logger;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentConverter;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentValue;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.mitre.taxii.client.HttpClient;
+import org.mitre.taxii.messages.xml11.*;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class TaxiiHandler extends TimerTask {
+    private static final Logger LOG = Logger.getLogger(TaxiiHandler.class);
+
+    private static ThreadLocal<TaxiiXmlFactory> xmlFactory = new ThreadLocal<TaxiiXmlFactory>() {
+        @Override
+        protected TaxiiXmlFactory initialValue() {
+            return new TaxiiXmlFactory();
+        }
+    };
+    private static ThreadLocal<ObjectFactory> messageFactory = new ThreadLocal<ObjectFactory>() {
+        @Override
+        protected ObjectFactory initialValue() {
+            return new ObjectFactory();
+        }
+    };
+
+    private HttpClient taxiiClient;
+    private URL endpoint;
+    private Extractor extractor;
+    private String hbaseTable;
+    private String columnFamily;
+    private Map<String, HTableInterface> connectionCache = new HashMap<>();
+    private HttpClientContext context;
+    private String collection;
+    private String subscriptionId;
+    private EnrichmentConverter converter = new EnrichmentConverter();
+    private Date beginTime;
+    private Configuration config;
+    private boolean inProgress = false;
+    private Set<String> allowedIndicatorTypes;
+    public TaxiiHandler( TaxiiConnectionConfig connectionConfig
+                       , Extractor extractor
+                       , Configuration config
+                       ) throws Exception
+    {
+        LOG.info("Loading configuration: " + connectionConfig);
+        this.allowedIndicatorTypes = connectionConfig.getAllowedIndicatorTypes();
+        this.extractor = extractor;
+        this.collection = connectionConfig.getCollection();
+        this.subscriptionId = connectionConfig.getSubscriptionId();
+        hbaseTable = connectionConfig.getTable();
+        columnFamily = connectionConfig.getColumnFamily();
+        this.beginTime = connectionConfig.getBeginTime();
+        this.config = config;
+        initializeClient(connectionConfig);
+        LOG.info("Configured, starting polling " + endpoint + " for " + collection);
+    }
+
+    protected synchronized HTableInterface getTable(String table) throws IOException {
+        HTableInterface ret = connectionCache.get(table);
+        if(ret == null) {
+            ret = createHTable(table);
+            connectionCache.put(table, ret);
+        }
+        return ret;
+    }
+
+    protected synchronized HTableInterface createHTable(String tableInfo) throws IOException {
+        return new HTable(config, tableInfo);
+    }
+    /**
+     * The action to be performed by this timer task.
+     */
+    @Override
+    public void run() {
+        if(inProgress) {
+            return;
+        }
+        Date ts = new Date();
+        LOG.info("Polling..." + new SimpleDateFormat().format(ts));
+        try {
+            inProgress = true;
+            // Prepare the message to send.
+            String sessionID = MessageHelper.generateMessageId();
+            PollRequest request = messageFactory.get().createPollRequest()
+                    .withMessageId(sessionID)
+                    .withCollectionName(collection);
+            if (subscriptionId != null) {
+                request = request.withSubscriptionID(subscriptionId);
+            } else {
+                request = request.withPollParameters(messageFactory.get().createPollParametersType());
+            }
+            if (beginTime != null) {
+                Calendar gc = GregorianCalendar.getInstance();
+                gc.setTime(beginTime);
+                XMLGregorianCalendar gTime = null;
+                try {
+                    gTime = DatatypeFactory.newInstance().newXMLGregorianCalendar((GregorianCalendar) gc).normalize();
+                } catch (DatatypeConfigurationException e) {
+                    LOG.error("Unable to set the begin time", e);
+                }
+                gTime.setFractionalSecond(null);
+                LOG.info("Begin Time: " + gTime);
+                request.setExclusiveBeginTimestamp(gTime);
+            }
+
+            try {
+                PollResponse response = call(request, PollResponse.class);
+                LOG.info("Got Poll Response with " + response.getContentBlocks().size() + " blocks");
+                int numProcessed = 0;
+                long avgTimeMS = 0;
+                long timeStartedBlock = System.currentTimeMillis();
+                for (ContentBlock block : response.getContentBlocks()) {
+                    AnyMixedContentType content = block.getContent();
+                    for (Object o : content.getContent()) {
+                        numProcessed++;
+                        long timeS = System.currentTimeMillis();
+                        String xml = null;
+                        if (o instanceof Element) {
+                            Element element = (Element) o;
+                            xml = getStringFromDocument(element.getOwnerDocument());
+                            if(LOG.isDebugEnabled() && Math.random() < 0.01) {
+                                LOG.debug("Random Stix doc: " + xml);
+                            }
+                            for (LookupKV<EnrichmentKey, EnrichmentValue> kv : extractor.extract(xml)) {
+                                if(allowedIndicatorTypes.isEmpty()
+                                || allowedIndicatorTypes.contains(kv.getKey().type)
+                                  )
+                                {
+                                    kv.getValue().getMetadata().put("source_type", "taxii");
+                                    kv.getValue().getMetadata().put("taxii_url", endpoint.toString());
+                                    kv.getValue().getMetadata().put("taxii_collection", collection);
+                                    Put p = converter.toPut(columnFamily, kv.getKey(), kv.getValue());
+                                    HTableInterface table = getTable(hbaseTable);
+                                    table.put(p);
+                                    LOG.info("Found Threat Intel: " + kv.getKey() + " => " + kv.getValue());
+                                }
+                            }
+                        }
+                        avgTimeMS += System.currentTimeMillis() - timeS;
+                    }
+                    if( (numProcessed + 1) % 100 == 0) {
+                        LOG.info("Processed " + numProcessed + " in " + (System.currentTimeMillis() - timeStartedBlock) + " ms, avg time: " + avgTimeMS / content.getContent().size());
+                        timeStartedBlock = System.currentTimeMillis();
+                        avgTimeMS = 0;
+                        numProcessed = 0;
+                    }
+                }
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+                throw new RuntimeException("Unable to make request", e);
+            }
+        }
+        finally {
+            inProgress = false;
+            beginTime = ts;
+        }
+    }
+    public String getStringFromDocument(Document doc)
+    {
+        try
+        {
+            DOMSource domSource = new DOMSource(doc);
+            StringWriter writer = new StringWriter();
+            StreamResult result = new StreamResult(writer);
+            TransformerFactory tf = TransformerFactory.newInstance();
+            Transformer transformer = tf.newTransformer();
+            transformer.transform(domSource, result);
+            return writer.toString();
+        }
+        catch(TransformerException ex)
+        {
+            ex.printStackTrace();
+            return null;
+        }
+    }
+    private <RESPONSE_T> RESPONSE_T call( Object request, Class<RESPONSE_T> responseClazz) throws URISyntaxException, JAXBException, IOException {
+        return call(taxiiClient, endpoint.toURI(), request, context, responseClazz);
+    }
+
+    private void initializeClient(TaxiiConnectionConfig config) throws Exception {
+        LOG.info("Initializing client..");
+        if(context == null) {
+            context = createContext(config.getEndpoint(), config.getUsername(), config.getPassword(), config.getPort());
+        }
+        URL endpoint = config.getEndpoint();
+        if(config.getType() == ConnectionType.DISCOVER) {
+            LOG.info("Discovering endpoint");
+            endpoint = discoverPollingClient(config.getProxy(), endpoint, config.getUsername(), config.getPassword(), context, collection).pollEndpoint;
+            this.endpoint = endpoint;
+            LOG.info("Discovered endpoint as " + endpoint);
+        }
+        taxiiClient = buildClient(config.getProxy(), config.getUsername(), config.getPassword());
+    }
+
+    private static class DiscoveryResults {
+        URL pollEndpoint;
+        URL collectionManagementEndpoint;
+        List<String> collections = new ArrayList<>();
+    }
+    private static DiscoveryResults discoverPollingClient(URL proxy, URL endpoint, String username, String password, HttpClientContext context, String defaultCollection) throws Exception {
+
+        DiscoveryResults results = new DiscoveryResults();
+        {
+            HttpClient discoverClient = buildClient(proxy, username, password);
+            String sessionID = MessageHelper.generateMessageId();
+            // Prepare the message to send.
+            DiscoveryRequest request = messageFactory.get().createDiscoveryRequest()
+                    .withMessageId(sessionID);
+            DiscoveryResponse response = call(discoverClient, endpoint.toURI(), request, context, DiscoveryResponse.class);
+            for (ServiceInstanceType serviceInstance : response.getServiceInstances()) {
+                if (serviceInstance.isAvailable() && serviceInstance.getServiceType() == ServiceTypeEnum.POLL) {
+                    results.pollEndpoint = new URL(serviceInstance.getAddress());
+                }
+                else if(serviceInstance.isAvailable() && serviceInstance.getServiceType() == ServiceTypeEnum.COLLECTION_MANAGEMENT) {
+                    results.collectionManagementEndpoint= new URL(serviceInstance.getAddress());
+                }
+            }
+            if (results.pollEndpoint == null) {
+                throw new RuntimeException("Unable to discover a poll TAXII feed");
+            }
+        }
+        if(defaultCollection == null)
+        //get collections
+        {
+            HttpClient discoverClient = buildClient(proxy, username, password);
+            String sessionID = MessageHelper.generateMessageId();
+            CollectionInformationRequest request = messageFactory.get().createCollectionInformationRequest()
+                                                                 .withMessageId(sessionID);
+            CollectionInformationResponse response = call(discoverClient, results.collectionManagementEndpoint.toURI(), request, context, CollectionInformationResponse.class);
+            LOG.info("Unable to find the default collection; available collections are:");
+            for(CollectionRecordType c : response.getCollections()) {
+                LOG.info(c.getCollectionName());
+                results.collections.add(c.getCollectionName());
+            }
+            System.exit(0);
+        }
+        return results;
+    }
+
+    private static HttpClientContext createContext(URL endpoint, String username, String password, int port) {
+        HttpClientContext context = null;
+        HttpHost target = new HttpHost(endpoint.getHost(), port, endpoint.getProtocol());
+        if (username != null && password != null) {
+
+            CredentialsProvider credsProvider = new BasicCredentialsProvider();
+            credsProvider.setCredentials(
+                    new AuthScope(target.getHostName(), target.getPort()),
+                    new UsernamePasswordCredentials(username, password));
+
+            // http://hc.apache.org/httpcomponents-client-ga/tutorial/html/authentication.html
+            AuthCache authCache = new BasicAuthCache();
+            authCache.put(target, new BasicScheme());
+
+            // Add AuthCache to the execution context
+            context = HttpClientContext.create();
+            context.setCredentialsProvider(credsProvider);
+            context.setAuthCache(authCache);
+        } else {
+            context = null;
+        }
+        return context;
+    }
+
+
+    public static <RESPONSE_T, REQUEST_T> RESPONSE_T call( HttpClient taxiiClient
+            , URI endpoint
+            , REQUEST_T request
+            , HttpClientContext context
+            , Class<RESPONSE_T> responseClazz
+    ) throws JAXBException, IOException {
+        //TaxiiXml taxiiXml = xmlFactory.get().createTaxiiXml();
+        //String req = taxiiXml.marshalToString(request, true);
+        // Call the service
+        Object responseObj =  taxiiClient.callTaxiiService(endpoint, request, context);
+        LOG.info("Request made : " + request.getClass().getCanonicalName() + " => " + responseObj.getClass().getCanonicalName() + " (expected " + responseClazz.getCanonicalName() + ")");
+        //String resp = taxiiXml.marshalToString(responseObj, true);
+        try {
+            return responseClazz.cast(responseObj);
+        }
+        catch(ClassCastException cce) {
+            TaxiiXml taxiiXml = xmlFactory.get().createTaxiiXml();
+            String resp = taxiiXml.marshalToString(responseObj, true);
+            String msg = "Didn't return the response we expected: " + responseObj.getClass() + " \n" + resp;
+            LOG.error(msg, cce);
+            throw new RuntimeException(msg, cce);
+        }
+    }
+    private static HttpClient buildClient(URL proxy, String username, String password) throws Exception
+    {
+        HttpClient client = new HttpClient(); // Start with a default TAXII HTTP client.
+
+        // Create an Apache HttpClientBuilder to be customized by the command line arguments.
+        HttpClientBuilder builder = HttpClientBuilder.create().useSystemProperties();
+
+        // Proxy
+        if (proxy != null) {
+            HttpHost proxyHost = new HttpHost(proxy.getHost(), proxy.getPort(), proxy.getProtocol());
+            builder.setProxy(proxyHost);
+        }
+
+        // Basic authentication. User & Password
+        if (username != null ^ password != null) {
+            throw new Exception("'username' and 'password' arguments are required to appear together.");
+        }
+
+
+        // from:  http://stackoverflow.com/questions/19517538/ignoring-ssl-certificate-in-apache-httpclient-4-3
+        SSLContextBuilder ssbldr = new SSLContextBuilder();
+        ssbldr.loadTrustMaterial(null, new TrustSelfSignedStrategy());
+        SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(ssbldr.build(),SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
+
+
+        Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
+                .register("http", new PlainConnectionSocketFactory())
+                .register("https", sslsf)
+                .build();
+
+
+        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registry);
+        cm.setMaxTotal(20);//max connection
+
+        System.setProperty("jsse.enableSNIExtension", "false"); //""
+        CloseableHttpClient httpClient = builder
+                .setSSLSocketFactory(sslsf)
+                .setConnectionManager(cm)
+                .build();
+
+        client.setHttpclient(httpClient);
+        return client;
+    }
+    public static void main(String... argv) throws Exception {
+        URL endpoint = new URL("http://hailataxii.com/taxii-discovery-service");
+        String username = "guest";
+        String password = "guest";
+        TaxiiConnectionConfig config = new TaxiiConnectionConfig();
+        config = config.withConnectionType(ConnectionType.DISCOVER)
+                       .withEndpoint(endpoint)
+                       .withUsername(username)
+                       .withCollection("guest.Abuse_ch")
+                       .withPassword(password);
+        //TaxiiHandler handler = new TaxiiHandler(config, null);
+        //handler.run();
+        //discoverPollingClient(null, endpoint, username, password, context);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiLoader.java
new file mode 100644
index 0000000..712fcf3
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiLoader.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.taxii;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.enrichment.EnrichmentConfig;
+import org.apache.metron.utils.JSONUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.text.*;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+
+public class TaxiiLoader {
+  private static abstract class OptionHandler implements Function<String, Option> {}
+  private enum TaxiiOptions {
+    HELP("h", new OptionHandler() {
+
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        return new Option(s, "help", false, "Generate Help screen");
+      }
+    })
+    ,EXTRACTOR_CONFIG("e", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+        o.setArgName("JSON_FILE");
+        o.setRequired(true);
+        return o;
+      }
+    })
+    ,CONNECTION_CONFIG("c", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "taxii_connection_config", true, "The JSON config file to configure the connection");
+        o.setArgName("config_file");
+        o.setRequired(true);
+        return o;
+      }
+    })
+    ,TIME_BETWEEN_POLLS("p", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "time_between_polls", true, "The time between polls (in ms)");
+        o.setArgName("MS");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ,BEGIN_TIME("b", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "begin_time", true, "Start time to poll the Taxii server (all data from that point will be gathered in the first pull).");
+        o.setArgName(DATE_FORMAT.toPattern());
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ,LOG4J_PROPERTIES("l", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "log4j", true, "The log4j properties file to load");
+        o.setArgName("FILE");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ,ENRICHMENT_CONFIG("n", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "enrichment_config", true
+                , "JSON Document describing the enrichment configuration details." +
+                "  This is used to associate an enrichment type with a field type in zookeeper."
+        );
+        o.setArgName("JSON_FILE");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ;
+    Option option;
+    String shortCode;
+    TaxiiOptions(String shortCode, OptionHandler optionHandler) {
+      this.shortCode = shortCode;
+      this.option = optionHandler.apply(shortCode);
+    }
+
+    public boolean has(CommandLine cli) {
+      return cli.hasOption(shortCode);
+    }
+
+    public String get(CommandLine cli) {
+      return cli.getOptionValue(shortCode);
+    }
+
+    public static CommandLine parse(CommandLineParser parser, String[] args) {
+      try {
+        CommandLine cli = parser.parse(getOptions(), args);
+        if(TaxiiOptions.HELP.has(cli)) {
+          printHelp();
+          System.exit(0);
+        }
+        return cli;
+      } catch (ParseException e) {
+        System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+        e.printStackTrace(System.err);
+        printHelp();
+        System.exit(-1);
+        return null;
+      }
+    }
+
+    public static void printHelp() {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp( "TaxiiLoader", getOptions());
+    }
+
+    public static Options getOptions() {
+      Options ret = new Options();
+      for(TaxiiOptions o : TaxiiOptions.values()) {
+        ret.addOption(o.option);
+      }
+      return ret;
+    }
+  }
+  public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+  public static final long ONE_HR_IN_MS = 60*60*1000;
+  public static final long DEFAULT_TIME_BETWEEN_POLLS = ONE_HR_IN_MS;
+
+
+  public static void main(String... argv) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    String zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+    String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+    CommandLine cli = TaxiiOptions.parse(new PosixParser(), otherArgs);
+    if(TaxiiOptions.LOG4J_PROPERTIES.has(cli)) {
+      PropertyConfigurator.configure(TaxiiOptions.LOG4J_PROPERTIES.get(cli));
+    }
+    ExtractorHandler handler = ExtractorHandler.load(FileUtils.readFileToString(new File(TaxiiOptions.EXTRACTOR_CONFIG.get(cli))));
+    Extractor e = handler.getExtractor();
+    EnrichmentConfig enrichmentConfig = null;
+    if(TaxiiOptions.ENRICHMENT_CONFIG.has(cli)) {
+      enrichmentConfig = JSONUtils.INSTANCE.load( new File(TaxiiOptions.ENRICHMENT_CONFIG.get(cli))
+              , EnrichmentConfig.class
+      );
+      enrichmentConfig.updateSensorConfigs();
+    }
+
+    Timer timer = new Timer();
+    if(e instanceof StixExtractor) {
+      StixExtractor extractor = (StixExtractor)e;
+      TaxiiConnectionConfig connectionConfig = TaxiiConnectionConfig.load(FileUtils.readFileToString(new File(TaxiiOptions.CONNECTION_CONFIG.get(cli))));
+      if(TaxiiOptions.BEGIN_TIME.has(cli)) {
+        Date d = DATE_FORMAT.parse(TaxiiOptions.BEGIN_TIME.get(cli));
+        connectionConfig.withBeginTime(d);
+      }
+      long timeBetween = DEFAULT_TIME_BETWEEN_POLLS;
+      if(TaxiiOptions.TIME_BETWEEN_POLLS.has(cli)) {
+        timeBetween = Long.parseLong(TaxiiOptions.TIME_BETWEEN_POLLS.get(cli));
+      }
+      timer.scheduleAtFixedRate(new TaxiiHandler(connectionConfig, extractor, conf), 0, timeBetween);
+    }
+    else {
+      throw new IllegalStateException("Extractor must be a STIX Extractor");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java
deleted file mode 100644
index 1e95507..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.dataloads.taxii;
-
-public enum ConnectionType {
-   POLL, DISCOVER;
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java
deleted file mode 100644
index ddf542e..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.dataloads.taxii;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.hbase.client.HTableInterface;
-
-public class TableInfo {
-    private String tableName;
-    private String columnFamily;
-    public TableInfo(String s) {
-        Iterable<String> i = Splitter.on(":").split(s);
-        if(Iterables.size(i) != 2) {
-            throw new IllegalStateException("Malformed table:cf => " + s);
-        }
-        tableName = Iterables.getFirst(i, null);
-        columnFamily = Iterables.getLast(i);
-    }
-
-    public String getTableName() {
-        return tableName;
-    }
-
-    public String getColumnFamily() {
-        return columnFamily;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        TableInfo tableInfo = (TableInfo) o;
-
-        if (getTableName() != null ? !getTableName().equals(tableInfo.getTableName()) : tableInfo.getTableName() != null)
-            return false;
-        return getColumnFamily() != null ? getColumnFamily().equals(tableInfo.getColumnFamily()) : tableInfo.getColumnFamily() == null;
-
-    }
-
-    @Override
-    public int hashCode() {
-        int result = getTableName() != null ? getTableName().hashCode() : 0;
-        result = 31 * result + (getColumnFamily() != null ? getColumnFamily().hashCode() : 0);
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "TableInfo{" +
-                "tableName='" + tableName + '\'' +
-                ", columnFamily='" + columnFamily + '\'' +
-                '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java
deleted file mode 100644
index dab8f0c..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.dataloads.taxii;
-
-import org.codehaus.jackson.map.ObjectMapper;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.charset.Charset;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-public class TaxiiConnectionConfig {
-    final static ObjectMapper _mapper = new ObjectMapper();
-    private URL endpoint;
-    private int port = 443;
-    private URL proxy;
-    private String username;
-    private String password;
-    private ConnectionType type;
-    private String collection = "default";
-    private String subscriptionId = null;
-    private Date beginTime;
-    private Map<String, TableInfo> tableMap;
-    public TaxiiConnectionConfig withTableMap(Map<String, TableInfo> tableMap) {
-        this.tableMap = tableMap;
-        return this;
-    }
-    public TaxiiConnectionConfig withBeginTime(Date time) {
-        this.beginTime = time;
-        return this;
-    }
-    public TaxiiConnectionConfig withSubscriptionId(String subId) {
-        this.subscriptionId = subId;
-        return this;
-    }
-    public TaxiiConnectionConfig withCollection(String collection) {
-        this.collection = collection;
-        return this;
-    }
-
-    public TaxiiConnectionConfig withPort(int port) {
-        this.port = port;
-        return this;
-    }
-    public TaxiiConnectionConfig withEndpoint(URL endpoint) {
-        this.endpoint = endpoint;
-        return this;
-    }
-    public TaxiiConnectionConfig withProxy(URL proxy) {
-        this.proxy = proxy;
-        return this;
-    }
-    public TaxiiConnectionConfig withUsername(String username) {
-        this.username = username;
-        return this;
-    }
-    public TaxiiConnectionConfig withPassword(String password) {
-        this.password = password;
-        return this;
-    }
-    public TaxiiConnectionConfig withConnectionType(ConnectionType type) {
-        this.type= type;
-        return this;
-    }
-
-    public void setEndpoint(String endpoint) throws MalformedURLException {
-        this.endpoint = new URL(endpoint);
-    }
-
-    public void setPort(int port) {
-        this.port = port;
-    }
-
-    public void setProxy(String proxy) throws MalformedURLException {
-        this.proxy = new URL(proxy);
-    }
-
-    public void setUsername(String username) {
-        this.username = username;
-    }
-
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    public void setType(ConnectionType type) {
-        this.type = type;
-    }
-
-    public void setCollection(String collection) {
-        this.collection = collection;
-    }
-
-    public void setSubscriptionId(String subscriptionId) {
-        this.subscriptionId = subscriptionId;
-    }
-
-    public void setBeginTime(String beginTime) throws ParseException {
-        SimpleDateFormat sdf = (SimpleDateFormat)DateFormat.getDateInstance(DateFormat.MEDIUM);
-        this.beginTime = sdf.parse(beginTime);
-    }
-
-    public void setTableMap(Map<String, String> tableMap) {
-        this.tableMap = new HashMap<>();
-        for(Map.Entry<String, String> kv : tableMap.entrySet()) {
-            this.tableMap.put(kv.getKey(), new TableInfo(kv.getValue()));
-        }
-    }
-
-    public Map<String, TableInfo> getTableMap() {
-        return tableMap;
-    }
-
-    public Date getBeginTime() {
-        return beginTime;
-    }
-    public int getPort() {
-        return port;
-    }
-    public URL getEndpoint() {
-        return endpoint;
-    }
-
-    public URL getProxy() {
-        return proxy;
-    }
-
-    public String getUsername() {
-        return username;
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    public ConnectionType getType() {
-        return type;
-    }
-
-    public String getCollection() {
-        return collection;
-    }
-    public String getSubscriptionId() {
-        return subscriptionId;
-    }
-    public static synchronized TaxiiConnectionConfig load(InputStream is) throws IOException {
-        TaxiiConnectionConfig ret = _mapper.readValue(is, TaxiiConnectionConfig.class);
-        return ret;
-    }
-    public static synchronized TaxiiConnectionConfig load(String s, Charset c) throws IOException {
-        return load( new ByteArrayInputStream(s.getBytes(c)));
-    }
-    public static synchronized TaxiiConnectionConfig load(String s) throws IOException {
-        return load( s, Charset.defaultCharset());
-    }
-
-    @Override
-    public String toString() {
-        return "TaxiiConnectionConfig{" +
-                "endpoint=" + endpoint +
-                ", port=" + port +
-                ", proxy=" + proxy +
-                ", username='" + username + '\'' +
-                ", password=" + (password == null?"null" : "'******'") +
-                ", type=" + type +
-                ", collection='" + collection + '\'' +
-                ", subscriptionId='" + subscriptionId + '\'' +
-                ", beginTime=" + beginTime +
-                ", tableMap=" + tableMap +
-                '}';
-    }
-}