You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/04/07 14:56:37 UTC
[4/6] incubator-metron git commit: METRON-93: Generalize the HBase
threat intel infrastructure to support enrichments closes
apache/incubator-metron#64
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
index 233550b..ae839d2 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
@@ -18,10 +18,9 @@
package org.apache.metron.dataloads.extractor.stix.types;
import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentValue;
import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.threatintel.ThreatIntelResults;
import org.mitre.cybox.common_2.StringObjectPropertyType;
import org.mitre.cybox.objects.DomainName;
import org.mitre.cybox.objects.DomainNameTypeEnum;
@@ -30,30 +29,49 @@ import java.io.IOException;
import java.util.*;
public class DomainHandler extends AbstractObjectTypeHandler<DomainName> {
- EnumSet<DomainNameTypeEnum> SUPPORTED_TYPES = EnumSet.of(DomainNameTypeEnum.FQDN);
- public DomainHandler() {
- super(DomainName.class);
- }
+ public static final String TYPE_CONFIG = "stix_domain_type";
+ EnumSet<DomainNameTypeEnum> SUPPORTED_TYPES = EnumSet.of(DomainNameTypeEnum.FQDN);
+ public DomainHandler() {
+ super(DomainName.class);
+ }
- @Override
- public Iterable<LookupKV> extract(final DomainName type, Map<String, Object> config) throws IOException {
- List<LookupKV> ret = new ArrayList<>();
- final DomainNameTypeEnum domainType = type.getType();
- if(domainType == null || SUPPORTED_TYPES.contains(domainType)) {
- StringObjectPropertyType value = type.getValue();
- for (String token : StixExtractor.split(value)) {
- LookupKV results = new LookupKV(new ThreatIntelKey(token)
- , new ThreatIntelValue(
- new HashMap<String, String>() {{
- put("source-type", "STIX");
- put("indicator-type", type.getClass().getSimpleName() + ":" + DomainNameTypeEnum.FQDN);
- put("source", type.toXMLString());
- }}
- )
- );
- ret.add(results);
- }
- }
- return ret;
+ @Override
+ public Iterable<LookupKV> extract(final DomainName type, Map<String, Object> config) throws IOException {
+ List<LookupKV> ret = new ArrayList<>();
+ String typeStr = getType();
+ if(config != null) {
+ Object o = config.get(TYPE_CONFIG);
+ if(o != null) {
+ typeStr = o.toString();
+ }
+ }
+ final DomainNameTypeEnum domainType = type.getType();
+ if(domainType == null || SUPPORTED_TYPES.contains(domainType)) {
+ StringObjectPropertyType value = type.getValue();
+ for (String token : StixExtractor.split(value)) {
+ final String indicatorType = typeStr + ":" + DomainNameTypeEnum.FQDN;
+ LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
+ , new EnrichmentValue(
+ new HashMap<String, String>() {{
+ put("source-type", "STIX");
+ put("indicator-type", indicatorType);
+ put("source", type.toXMLString());
+ }}
+ )
+ );
+ ret.add(results);
+ }
+ }
+ return ret;
+ }
+ @Override
+ public List<String> getPossibleTypes() {
+ String typeStr = getType();
+ List<String> ret = new ArrayList<>();
+ for(DomainNameTypeEnum e : SUPPORTED_TYPES)
+ {
+ ret.add(typeStr + ":" + e);
}
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
index 702c440..ab02440 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
@@ -18,11 +18,11 @@
package org.apache.metron.dataloads.extractor.stix.types;
+import com.google.common.collect.ImmutableList;
import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentValue;
import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.threatintel.ThreatIntelResults;
import org.mitre.cybox.common_2.StringObjectPropertyType;
import org.mitre.cybox.objects.Hostname;
@@ -33,25 +33,38 @@ import java.util.List;
import java.util.Map;
public class HostnameHandler extends AbstractObjectTypeHandler<Hostname>{
- public HostnameHandler() {
- super(Hostname.class);
- }
+ public static final String TYPE_CONFIG = "stix_hostname_type";
+ public HostnameHandler() {
+ super(Hostname.class);
+ }
- @Override
- public Iterable<LookupKV> extract(final Hostname type, Map<String, Object> config) throws IOException {
- StringObjectPropertyType value = type.getHostnameValue();
- List<LookupKV> ret = new ArrayList<>();
- for(String token : StixExtractor.split(value)) {
- LookupKV results = new LookupKV(new ThreatIntelKey(token)
- , new ThreatIntelValue(new HashMap<String, String>() {{
- put("source-type", "STIX");
- put("indicator-type", type.getClass().getSimpleName());
- put("source", type.toXMLString());
- }}
- )
- );
- ret.add(results);
- }
- return ret;
+ @Override
+ public Iterable<LookupKV> extract(final Hostname type, Map<String, Object> config) throws IOException {
+ StringObjectPropertyType value = type.getHostnameValue();
+ String typeStr = getType();
+ if(config != null) {
+ Object o = config.get(TYPE_CONFIG);
+ if(o != null) {
+ typeStr = o.toString();
+ }
+ }
+ List<LookupKV> ret = new ArrayList<>();
+ for(String token : StixExtractor.split(value)) {
+ final String indicatorType = typeStr;
+ LookupKV results = new LookupKV(new EnrichmentKey(indicatorType, token)
+ , new EnrichmentValue(new HashMap<String, String>() {{
+ put("source-type", "STIX");
+ put("indicator-type", indicatorType);
+ put("source", type.toXMLString());
+ }}
+ )
+ );
+ ret.add(results);
}
+ return ret;
+ }
+ @Override
+ public List<String> getPossibleTypes() {
+ return ImmutableList.of(getType());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
index e5a5296..57d72ff 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
@@ -17,15 +17,15 @@
*/
package org.apache.metron.dataloads.extractor.stix.types;
-import org.apache.metron.dataloads.extractor.Extractor;
import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.threatintel.ThreatIntelResults;
import org.mitre.cybox.common_2.ObjectPropertiesType;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
public interface ObjectTypeHandler<T extends ObjectPropertiesType> {
- Iterable<LookupKV> extract(T type, Map<String, Object> config) throws IOException;
- Class<T> getTypeClass();
+ Iterable<LookupKV> extract(T type, Map<String, Object> config) throws IOException;
+ Class<T> getTypeClass();
+ List<String> getPossibleTypes();
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
index 04714d9..5baa3a5 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
@@ -26,8 +26,6 @@ import org.apache.metron.dataloads.extractor.Extractor;
import org.apache.metron.dataloads.extractor.ExtractorHandler;
import org.apache.metron.hbase.converters.HbaseConverter;
import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.threatintel.ThreatIntelResults;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
import java.io.IOException;
@@ -39,7 +37,6 @@ public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable,
public static final String CONVERTER_KEY = "bl_converter";
Extractor extractor = null;
String columnFamily = null;
- Long lastSeen = null;
HbaseConverter converter;
@Override
public void setup(Context context) throws IOException,
@@ -61,7 +58,6 @@ public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable,
String configStr = configuration.get(CONFIG_KEY);
extractor = ExtractorHandler.load(configStr).getExtractor();
columnFamily = configuration.get(COLUMN_FAMILY_KEY);
- lastSeen = Long.parseLong(configuration.get(LAST_SEEN_KEY));
try {
converter = (HbaseConverter) Class.forName(configuration.get(CONVERTER_KEY)).newInstance();
} catch (InstantiationException e) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
index bf33eed..b418a80 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
@@ -48,7 +48,7 @@ public class PrunerMapper extends TableMapper<ImmutableBytesWritable, Delete> {
}
try {
tracker = AccessTrackerUtil.INSTANCE.loadAll(AccessTrackerUtil.INSTANCE.loadAll(table, atCF, atName, timestamp));
- } catch (Exception e) {
+ } catch (Throwable e) {
throw new IllegalStateException("Unable to load the accesstrackers from the directory", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
new file mode 100644
index 0000000..816311f
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.cli.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
+import org.apache.metron.enrichment.EnrichmentConfig;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.converters.HbaseConverter;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentConverter;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentValue;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.apache.metron.utils.JSONUtils;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+public class SimpleEnrichmentFlatFileLoader {
+ private static abstract class OptionHandler implements Function<String, Option> {}
+ public static enum LoadOptions {
+ HELP("h", new OptionHandler() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ return new Option(s, "help", false, "Generate Help screen");
+ }
+ })
+ ,HBASE_TABLE("t", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "hbase_table", true, "HBase table to ingest the data into.");
+ o.setArgName("TABLE");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,HBASE_CF("c", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "hbase_cf", true, "HBase column family to ingest the data into.");
+ o.setArgName("CF");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,EXTRACTOR_CONFIG("e", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+ o.setArgName("JSON_FILE");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,ENRICHMENT_CONFIG("n", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "enrichment_config", true
+ , "JSON Document describing the enrichment configuration details." +
+ " This is used to associate an enrichment type with a field type in zookeeper."
+ );
+ o.setArgName("JSON_FILE");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,LOG4J_PROPERTIES("l", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "log4j", true, "The log4j properties file to load");
+ o.setArgName("FILE");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,INPUT("i", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "input", true, "The CSV File to load");
+ o.setArgName("FILE");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ;
+ Option option;
+ String shortCode;
+ LoadOptions(String shortCode, OptionHandler optionHandler) {
+ this.shortCode = shortCode;
+ this.option = optionHandler.apply(shortCode);
+ }
+
+ public boolean has(CommandLine cli) {
+ return cli.hasOption(shortCode);
+ }
+
+ public String get(CommandLine cli) {
+ return cli.getOptionValue(shortCode);
+ }
+
+ public static CommandLine parse(CommandLineParser parser, String[] args) {
+ try {
+ CommandLine cli = parser.parse(getOptions(), args);
+ if(HELP.has(cli)) {
+ printHelp();
+ System.exit(0);
+ }
+ return cli;
+ } catch (ParseException e) {
+ System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+ e.printStackTrace(System.err);
+ printHelp();
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ public static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "SimpleEnrichmentFlatFileLoader", getOptions());
+ }
+
+ public static Options getOptions() {
+ Options ret = new Options();
+ for(LoadOptions o : LoadOptions.values()) {
+ ret.addOption(o.option);
+ }
+ return ret;
+ }
+ }
+ public static List<File> getFiles(File root) {
+ if(!root.isDirectory()) {
+ return ImmutableList.of(root);
+ }
+ List<File> ret = new ArrayList<>();
+ Stack<File> stack = new Stack<File>();
+ stack.push(root);
+ while(!stack.isEmpty()) {
+ File f = stack.pop();
+ if(f.isDirectory()) {
+ for(File child : f.listFiles()) {
+ stack.push(child);
+ }
+ }
+ else {
+ ret.add(f);
+ }
+ }
+ return ret;
+ }
+
+ public HTableProvider getProvider() {
+ return new HTableProvider();
+ }
+
+ public List<Put> extract( String line
+ , Extractor extractor
+ , String cf
+ , HbaseConverter converter
+ ) throws IOException
+ {
+ List<Put> ret = new ArrayList<>();
+ Iterable<LookupKV> kvs = extractor.extract(line);
+ for(LookupKV kv : kvs) {
+ Put put = converter.toPut(cf, kv.getKey(), kv.getValue());
+ ret.add(put);
+ }
+ return ret;
+ }
+
+
+ public void loadFile( File inputFile
+ , Extractor extractor
+ , HTableInterface table
+ , String cf
+ , HbaseConverter converter
+ , boolean lineByLine
+ ) throws IOException
+ {
+ if(!lineByLine) {
+ table.put(extract(FileUtils.readFileToString(inputFile), extractor, cf, converter));
+ }
+ else {
+ BufferedReader br = new BufferedReader(new FileReader(inputFile));
+ for(String line = null;(line = br.readLine()) != null;) {
+ table.put(extract(line, extractor, cf, converter));
+ }
+ }
+ }
+ public static void main(String... argv) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+ CommandLine cli = LoadOptions.parse(new PosixParser(), otherArgs);
+ if(LoadOptions.LOG4J_PROPERTIES.has(cli)) {
+ PropertyConfigurator.configure(LoadOptions.LOG4J_PROPERTIES.get(cli));
+ }
+ ExtractorHandler handler = ExtractorHandler.load(
+ FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli)))
+ );
+ boolean lineByLine = !handler.getInputFormatHandler().getClass().equals(WholeFileFormat.class);
+ Extractor e = handler.getExtractor();
+ EnrichmentConfig enrichmentConfig = null;
+ if(LoadOptions.ENRICHMENT_CONFIG.has(cli)) {
+ enrichmentConfig = JSONUtils.INSTANCE.load( new File(LoadOptions.ENRICHMENT_CONFIG.get(cli))
+ , EnrichmentConfig.class
+ );
+ }
+ HbaseConverter converter = new EnrichmentConverter();
+ List<File> inputFiles = getFiles(new File(LoadOptions.INPUT.get(cli)));
+ SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader();
+ HTableInterface table = loader.getProvider()
+ .getTable(conf, LoadOptions.HBASE_TABLE.get(cli));
+
+ for (File f : inputFiles) {
+ loader.loadFile(f, e, table, LoadOptions.HBASE_CF.get(cli), converter, lineByLine);
+ }
+ if(enrichmentConfig != null) {
+ enrichmentConfig.updateSensorConfigs();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/ConnectionType.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/ConnectionType.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/ConnectionType.java
new file mode 100644
index 0000000..77d1698
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/ConnectionType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.taxii;
+
+public enum ConnectionType {
+ POLL, DISCOVER;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TableInfo.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TableInfo.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TableInfo.java
new file mode 100644
index 0000000..6bbf8e3
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TableInfo.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.taxii;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+public class TableInfo {
+ private String tableName;
+ private String columnFamily;
+ public TableInfo(String s) {
+ Iterable<String> i = Splitter.on(":").split(s);
+ if(Iterables.size(i) != 2) {
+ throw new IllegalStateException("Malformed table:cf => " + s);
+ }
+ tableName = Iterables.getFirst(i, null);
+ columnFamily = Iterables.getLast(i);
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TableInfo tableInfo = (TableInfo) o;
+
+ if (getTableName() != null ? !getTableName().equals(tableInfo.getTableName()) : tableInfo.getTableName() != null)
+ return false;
+ return getColumnFamily() != null ? getColumnFamily().equals(tableInfo.getColumnFamily()) : tableInfo.getColumnFamily() == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getTableName() != null ? getTableName().hashCode() : 0;
+ result = 31 * result + (getColumnFamily() != null ? getColumnFamily().hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TableInfo{" +
+ "tableName='" + tableName + '\'' +
+ ", columnFamily='" + columnFamily + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiConnectionConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiConnectionConfig.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiConnectionConfig.java
new file mode 100644
index 0000000..678f98b
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiConnectionConfig.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.taxii;
+
+import com.google.common.base.Joiner;
+import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandlers;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class TaxiiConnectionConfig {
+ final static ObjectMapper _mapper = new ObjectMapper();
+ private URL endpoint;
+ private int port = 443;
+ private URL proxy;
+ private String username;
+ private String password;
+ private ConnectionType type;
+ private String collection = "default";
+ private String subscriptionId = null;
+ private Date beginTime;
+ private String table;
+ private String columnFamily;
+ private Set<String> allowedIndicatorTypes = new HashSet<String>();
+
+ public TaxiiConnectionConfig withAllowedIndicatorTypes(List<String> indicatorTypes) {
+ allowedIndicatorTypes = new HashSet(indicatorTypes);
+ return this;
+ }
+
+ public TaxiiConnectionConfig withTable(String table) {
+ this.table = table;
+ return this;
+ }
+ public TaxiiConnectionConfig withColumnFamily(String cf) {
+ this.columnFamily = cf;
+ return this;
+ }
+ public TaxiiConnectionConfig withBeginTime(Date time) {
+ this.beginTime = time;
+ return this;
+ }
+ public TaxiiConnectionConfig withSubscriptionId(String subId) {
+ this.subscriptionId = subId;
+ return this;
+ }
+ public TaxiiConnectionConfig withCollection(String collection) {
+ this.collection = collection;
+ return this;
+ }
+
+ public TaxiiConnectionConfig withPort(int port) {
+ this.port = port;
+ return this;
+ }
+ public TaxiiConnectionConfig withEndpoint(URL endpoint) {
+ this.endpoint = endpoint;
+ return this;
+ }
+ public TaxiiConnectionConfig withProxy(URL proxy) {
+ this.proxy = proxy;
+ return this;
+ }
+ public TaxiiConnectionConfig withUsername(String username) {
+ this.username = username;
+ return this;
+ }
+ public TaxiiConnectionConfig withPassword(String password) {
+ this.password = password;
+ return this;
+ }
+ public TaxiiConnectionConfig withConnectionType(ConnectionType type) {
+ this.type= type;
+ return this;
+ }
+
+ public void setEndpoint(String endpoint) throws MalformedURLException {
+ this.endpoint = new URL(endpoint);
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public void setProxy(String proxy) throws MalformedURLException {
+ this.proxy = new URL(proxy);
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public void setType(ConnectionType type) {
+ this.type = type;
+ }
+
+ public void setCollection(String collection) {
+ this.collection = collection;
+ }
+
+ public void setSubscriptionId(String subscriptionId) {
+ this.subscriptionId = subscriptionId;
+ }
+
+ public void setBeginTime(String beginTime) throws ParseException {
+ SimpleDateFormat sdf = (SimpleDateFormat)DateFormat.getDateInstance(DateFormat.MEDIUM);
+ this.beginTime = sdf.parse(beginTime);
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ public void setColumnFamily(String columnFamily) {
+ this.columnFamily = columnFamily;
+ }
+
+ public Date getBeginTime() {
+ return beginTime;
+ }
+ public int getPort() {
+ return port;
+ }
+ public URL getEndpoint() {
+ return endpoint;
+ }
+
+ public URL getProxy() {
+ return proxy;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public ConnectionType getType() {
+ return type;
+ }
+
+ public String getCollection() {
+ return collection;
+ }
+ public String getSubscriptionId() {
+ return subscriptionId;
+ }
+
+ public void setAllowedIndicatorTypes(List<String> allowedIndicatorTypes) {
+ withAllowedIndicatorTypes(allowedIndicatorTypes);
+ }
+
+ public Set<String> getAllowedIndicatorTypes() {
+ return allowedIndicatorTypes;
+ }
+ public static synchronized TaxiiConnectionConfig load(InputStream is) throws IOException {
+ TaxiiConnectionConfig ret = _mapper.readValue(is, TaxiiConnectionConfig.class);
+ return ret;
+ }
+ public static synchronized TaxiiConnectionConfig load(String s, Charset c) throws IOException {
+ return load( new ByteArrayInputStream(s.getBytes(c)));
+ }
+ public static synchronized TaxiiConnectionConfig load(String s) throws IOException {
+ return load( s, Charset.defaultCharset());
+ }
+
+ @Override
+ public String toString() {
+ return "TaxiiConnectionConfig{" +
+ "endpoint=" + endpoint +
+ ", port=" + port +
+ ", proxy=" + proxy +
+ ", username='" + username + '\'' +
+ ", password=" + (password == null?"null" : "'******'") +
+ ", type=" + type +
+ ", allowedIndicatorTypes=" + Joiner.on(',').join(allowedIndicatorTypes)+
+ ", collection='" + collection + '\'' +
+ ", subscriptionId='" + subscriptionId + '\'' +
+ ", beginTime=" + beginTime +
+ ", table=" + table + ":" + columnFamily+
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java
new file mode 100644
index 0000000..1e45f94
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.taxii;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.AuthCache;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLContextBuilder;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.client.BasicAuthCache;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.log4j.Logger;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentConverter;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentValue;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.mitre.taxii.client.HttpClient;
+import org.mitre.taxii.messages.xml11.*;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class TaxiiHandler extends TimerTask {
+ private static final Logger LOG = Logger.getLogger(TaxiiHandler.class);
+
+ private static ThreadLocal<TaxiiXmlFactory> xmlFactory = new ThreadLocal<TaxiiXmlFactory>() {
+ @Override
+ protected TaxiiXmlFactory initialValue() {
+ return new TaxiiXmlFactory();
+ }
+ };
+ private static ThreadLocal<ObjectFactory> messageFactory = new ThreadLocal<ObjectFactory>() {
+ @Override
+ protected ObjectFactory initialValue() {
+ return new ObjectFactory();
+ }
+ };
+
+ private HttpClient taxiiClient;
+ private URL endpoint;
+ private Extractor extractor;
+ private String hbaseTable;
+ private String columnFamily;
+ private Map<String, HTableInterface> connectionCache = new HashMap<>();
+ private HttpClientContext context;
+ private String collection;
+ private String subscriptionId;
+ private EnrichmentConverter converter = new EnrichmentConverter();
+ private Date beginTime;
+ private Configuration config;
+ private boolean inProgress = false;
+ private Set<String> allowedIndicatorTypes;
+ public TaxiiHandler( TaxiiConnectionConfig connectionConfig
+ , Extractor extractor
+ , Configuration config
+ ) throws Exception
+ {
+ LOG.info("Loading configuration: " + connectionConfig);
+ this.allowedIndicatorTypes = connectionConfig.getAllowedIndicatorTypes();
+ this.extractor = extractor;
+ this.collection = connectionConfig.getCollection();
+ this.subscriptionId = connectionConfig.getSubscriptionId();
+ hbaseTable = connectionConfig.getTable();
+ columnFamily = connectionConfig.getColumnFamily();
+ this.beginTime = connectionConfig.getBeginTime();
+ this.config = config;
+ initializeClient(connectionConfig);
+ LOG.info("Configured, starting polling " + endpoint + " for " + collection);
+ }
+
+ protected synchronized HTableInterface getTable(String table) throws IOException {
+ HTableInterface ret = connectionCache.get(table);
+ if(ret == null) {
+ ret = createHTable(table);
+ connectionCache.put(table, ret);
+ }
+ return ret;
+ }
+
+ protected synchronized HTableInterface createHTable(String tableInfo) throws IOException {
+ return new HTable(config, tableInfo);
+ }
+ /**
+ * The action to be performed by this timer task.
+ */
+ @Override
+ public void run() {
+ if(inProgress) {
+ return;
+ }
+ Date ts = new Date();
+ LOG.info("Polling..." + new SimpleDateFormat().format(ts));
+ try {
+ inProgress = true;
+ // Prepare the message to send.
+ String sessionID = MessageHelper.generateMessageId();
+ PollRequest request = messageFactory.get().createPollRequest()
+ .withMessageId(sessionID)
+ .withCollectionName(collection);
+ if (subscriptionId != null) {
+ request = request.withSubscriptionID(subscriptionId);
+ } else {
+ request = request.withPollParameters(messageFactory.get().createPollParametersType());
+ }
+ if (beginTime != null) {
+ Calendar gc = GregorianCalendar.getInstance();
+ gc.setTime(beginTime);
+ XMLGregorianCalendar gTime = null;
+ try {
+ gTime = DatatypeFactory.newInstance().newXMLGregorianCalendar((GregorianCalendar) gc).normalize();
+ } catch (DatatypeConfigurationException e) {
+ LOG.error("Unable to set the begin time", e);
+ }
+ gTime.setFractionalSecond(null);
+ LOG.info("Begin Time: " + gTime);
+ request.setExclusiveBeginTimestamp(gTime);
+ }
+
+ try {
+ PollResponse response = call(request, PollResponse.class);
+ LOG.info("Got Poll Response with " + response.getContentBlocks().size() + " blocks");
+ int numProcessed = 0;
+ long avgTimeMS = 0;
+ long timeStartedBlock = System.currentTimeMillis();
+ for (ContentBlock block : response.getContentBlocks()) {
+ AnyMixedContentType content = block.getContent();
+ for (Object o : content.getContent()) {
+ numProcessed++;
+ long timeS = System.currentTimeMillis();
+ String xml = null;
+ if (o instanceof Element) {
+ Element element = (Element) o;
+ xml = getStringFromDocument(element.getOwnerDocument());
+ if(LOG.isDebugEnabled() && Math.random() < 0.01) {
+ LOG.debug("Random Stix doc: " + xml);
+ }
+ for (LookupKV<EnrichmentKey, EnrichmentValue> kv : extractor.extract(xml)) {
+ if(allowedIndicatorTypes.isEmpty()
+ || allowedIndicatorTypes.contains(kv.getKey().type)
+ )
+ {
+ kv.getValue().getMetadata().put("source_type", "taxii");
+ kv.getValue().getMetadata().put("taxii_url", endpoint.toString());
+ kv.getValue().getMetadata().put("taxii_collection", collection);
+ Put p = converter.toPut(columnFamily, kv.getKey(), kv.getValue());
+ HTableInterface table = getTable(hbaseTable);
+ table.put(p);
+ LOG.info("Found Threat Intel: " + kv.getKey() + " => " + kv.getValue());
+ }
+ }
+ }
+ avgTimeMS += System.currentTimeMillis() - timeS;
+ }
+ if( (numProcessed + 1) % 100 == 0) {
+ LOG.info("Processed " + numProcessed + " in " + (System.currentTimeMillis() - timeStartedBlock) + " ms, avg time: " + avgTimeMS / content.getContent().size());
+ timeStartedBlock = System.currentTimeMillis();
+ avgTimeMS = 0;
+ numProcessed = 0;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException("Unable to make request", e);
+ }
+ }
+ finally {
+ inProgress = false;
+ beginTime = ts;
+ }
+ }
+ public String getStringFromDocument(Document doc)
+ {
+ try
+ {
+ DOMSource domSource = new DOMSource(doc);
+ StringWriter writer = new StringWriter();
+ StreamResult result = new StreamResult(writer);
+ TransformerFactory tf = TransformerFactory.newInstance();
+ Transformer transformer = tf.newTransformer();
+ transformer.transform(domSource, result);
+ return writer.toString();
+ }
+ catch(TransformerException ex)
+ {
+ ex.printStackTrace();
+ return null;
+ }
+ }
+ private <RESPONSE_T> RESPONSE_T call( Object request, Class<RESPONSE_T> responseClazz) throws URISyntaxException, JAXBException, IOException {
+ return call(taxiiClient, endpoint.toURI(), request, context, responseClazz);
+ }
+
+ private void initializeClient(TaxiiConnectionConfig config) throws Exception {
+ LOG.info("Initializing client..");
+ if(context == null) {
+ context = createContext(config.getEndpoint(), config.getUsername(), config.getPassword(), config.getPort());
+ }
+ URL endpoint = config.getEndpoint();
+ if(config.getType() == ConnectionType.DISCOVER) {
+ LOG.info("Discovering endpoint");
+ endpoint = discoverPollingClient(config.getProxy(), endpoint, config.getUsername(), config.getPassword(), context, collection).pollEndpoint;
+ this.endpoint = endpoint;
+ LOG.info("Discovered endpoint as " + endpoint);
+ }
+ taxiiClient = buildClient(config.getProxy(), config.getUsername(), config.getPassword());
+ }
+
+ private static class DiscoveryResults {
+ URL pollEndpoint;
+ URL collectionManagementEndpoint;
+ List<String> collections = new ArrayList<>();
+ }
+ private static DiscoveryResults discoverPollingClient(URL proxy, URL endpoint, String username, String password, HttpClientContext context, String defaultCollection) throws Exception {
+
+ DiscoveryResults results = new DiscoveryResults();
+ {
+ HttpClient discoverClient = buildClient(proxy, username, password);
+ String sessionID = MessageHelper.generateMessageId();
+ // Prepare the message to send.
+ DiscoveryRequest request = messageFactory.get().createDiscoveryRequest()
+ .withMessageId(sessionID);
+ DiscoveryResponse response = call(discoverClient, endpoint.toURI(), request, context, DiscoveryResponse.class);
+ for (ServiceInstanceType serviceInstance : response.getServiceInstances()) {
+ if (serviceInstance.isAvailable() && serviceInstance.getServiceType() == ServiceTypeEnum.POLL) {
+ results.pollEndpoint = new URL(serviceInstance.getAddress());
+ }
+ else if(serviceInstance.isAvailable() && serviceInstance.getServiceType() == ServiceTypeEnum.COLLECTION_MANAGEMENT) {
+ results.collectionManagementEndpoint= new URL(serviceInstance.getAddress());
+ }
+ }
+ if (results.pollEndpoint == null) {
+ throw new RuntimeException("Unable to discover a poll TAXII feed");
+ }
+ }
+ if(defaultCollection == null)
+ //get collections
+ {
+ HttpClient discoverClient = buildClient(proxy, username, password);
+ String sessionID = MessageHelper.generateMessageId();
+ CollectionInformationRequest request = messageFactory.get().createCollectionInformationRequest()
+ .withMessageId(sessionID);
+ CollectionInformationResponse response = call(discoverClient, results.collectionManagementEndpoint.toURI(), request, context, CollectionInformationResponse.class);
+ LOG.info("Unable to find the default collection; available collections are:");
+ for(CollectionRecordType c : response.getCollections()) {
+ LOG.info(c.getCollectionName());
+ results.collections.add(c.getCollectionName());
+ }
+ System.exit(0);
+ }
+ return results;
+ }
+
+ private static HttpClientContext createContext(URL endpoint, String username, String password, int port) {
+ HttpClientContext context = null;
+ HttpHost target = new HttpHost(endpoint.getHost(), port, endpoint.getProtocol());
+ if (username != null && password != null) {
+
+ CredentialsProvider credsProvider = new BasicCredentialsProvider();
+ credsProvider.setCredentials(
+ new AuthScope(target.getHostName(), target.getPort()),
+ new UsernamePasswordCredentials(username, password));
+
+ // http://hc.apache.org/httpcomponents-client-ga/tutorial/html/authentication.html
+ AuthCache authCache = new BasicAuthCache();
+ authCache.put(target, new BasicScheme());
+
+ // Add AuthCache to the execution context
+ context = HttpClientContext.create();
+ context.setCredentialsProvider(credsProvider);
+ context.setAuthCache(authCache);
+ } else {
+ context = null;
+ }
+ return context;
+ }
+
+
+ public static <RESPONSE_T, REQUEST_T> RESPONSE_T call( HttpClient taxiiClient
+ , URI endpoint
+ , REQUEST_T request
+ , HttpClientContext context
+ , Class<RESPONSE_T> responseClazz
+ ) throws JAXBException, IOException {
+ //TaxiiXml taxiiXml = xmlFactory.get().createTaxiiXml();
+ //String req = taxiiXml.marshalToString(request, true);
+ // Call the service
+ Object responseObj = taxiiClient.callTaxiiService(endpoint, request, context);
+ LOG.info("Request made : " + request.getClass().getCanonicalName() + " => " + responseObj.getClass().getCanonicalName() + " (expected " + responseClazz.getCanonicalName() + ")");
+ //String resp = taxiiXml.marshalToString(responseObj, true);
+ try {
+ return responseClazz.cast(responseObj);
+ }
+ catch(ClassCastException cce) {
+ TaxiiXml taxiiXml = xmlFactory.get().createTaxiiXml();
+ String resp = taxiiXml.marshalToString(responseObj, true);
+ String msg = "Didn't return the response we expected: " + responseObj.getClass() + " \n" + resp;
+ LOG.error(msg, cce);
+ throw new RuntimeException(msg, cce);
+ }
+ }
+ private static HttpClient buildClient(URL proxy, String username, String password) throws Exception
+ {
+ HttpClient client = new HttpClient(); // Start with a default TAXII HTTP client.
+
+ // Create an Apache HttpClientBuilder to be customized by the command line arguments.
+ HttpClientBuilder builder = HttpClientBuilder.create().useSystemProperties();
+
+ // Proxy
+ if (proxy != null) {
+ HttpHost proxyHost = new HttpHost(proxy.getHost(), proxy.getPort(), proxy.getProtocol());
+ builder.setProxy(proxyHost);
+ }
+
+ // Basic authentication. User & Password
+ if (username != null ^ password != null) {
+ throw new Exception("'username' and 'password' arguments are required to appear together.");
+ }
+
+
+ // from: http://stackoverflow.com/questions/19517538/ignoring-ssl-certificate-in-apache-httpclient-4-3
+ SSLContextBuilder ssbldr = new SSLContextBuilder();
+ ssbldr.loadTrustMaterial(null, new TrustSelfSignedStrategy());
+ SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(ssbldr.build(),SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
+
+
+ Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
+ .register("http", new PlainConnectionSocketFactory())
+ .register("https", sslsf)
+ .build();
+
+
+ PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registry);
+ cm.setMaxTotal(20);//max connection
+
+ System.setProperty("jsse.enableSNIExtension", "false"); //""
+ CloseableHttpClient httpClient = builder
+ .setSSLSocketFactory(sslsf)
+ .setConnectionManager(cm)
+ .build();
+
+ client.setHttpclient(httpClient);
+ return client;
+ }
+ public static void main(String... argv) throws Exception {
+ URL endpoint = new URL("http://hailataxii.com/taxii-discovery-service");
+ String username = "guest";
+ String password = "guest";
+ TaxiiConnectionConfig config = new TaxiiConnectionConfig();
+ config = config.withConnectionType(ConnectionType.DISCOVER)
+ .withEndpoint(endpoint)
+ .withUsername(username)
+ .withCollection("guest.Abuse_ch")
+ .withPassword(password);
+ //TaxiiHandler handler = new TaxiiHandler(config, null);
+ //handler.run();
+ //discoverPollingClient(null, endpoint, username, password, context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiLoader.java
new file mode 100644
index 0000000..712fcf3
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiLoader.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.taxii;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.extractor.stix.StixExtractor;
+import org.apache.metron.enrichment.EnrichmentConfig;
+import org.apache.metron.utils.JSONUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.text.*;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+
+public class TaxiiLoader {
+ private static abstract class OptionHandler implements Function<String, Option> {}
+ private enum TaxiiOptions {
+ HELP("h", new OptionHandler() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ return new Option(s, "help", false, "Generate Help screen");
+ }
+ })
+ ,EXTRACTOR_CONFIG("e", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+ o.setArgName("JSON_FILE");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,CONNECTION_CONFIG("c", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "taxii_connection_config", true, "The JSON config file to configure the connection");
+ o.setArgName("config_file");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,TIME_BETWEEN_POLLS("p", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "time_between_polls", true, "The time between polls (in ms)");
+ o.setArgName("MS");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,BEGIN_TIME("b", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "begin_time", true, "Start time to poll the Taxii server (all data from that point will be gathered in the first pull).");
+ o.setArgName(DATE_FORMAT.toPattern());
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,LOG4J_PROPERTIES("l", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "log4j", true, "The log4j properties file to load");
+ o.setArgName("FILE");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,ENRICHMENT_CONFIG("n", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "enrichment_config", true
+ , "JSON Document describing the enrichment configuration details." +
+ " This is used to associate an enrichment type with a field type in zookeeper."
+ );
+ o.setArgName("JSON_FILE");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ;
+ Option option;
+ String shortCode;
+ TaxiiOptions(String shortCode, OptionHandler optionHandler) {
+ this.shortCode = shortCode;
+ this.option = optionHandler.apply(shortCode);
+ }
+
+ public boolean has(CommandLine cli) {
+ return cli.hasOption(shortCode);
+ }
+
+ public String get(CommandLine cli) {
+ return cli.getOptionValue(shortCode);
+ }
+
+ public static CommandLine parse(CommandLineParser parser, String[] args) {
+ try {
+ CommandLine cli = parser.parse(getOptions(), args);
+ if(TaxiiOptions.HELP.has(cli)) {
+ printHelp();
+ System.exit(0);
+ }
+ return cli;
+ } catch (ParseException e) {
+ System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+ e.printStackTrace(System.err);
+ printHelp();
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ public static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "TaxiiLoader", getOptions());
+ }
+
+ public static Options getOptions() {
+ Options ret = new Options();
+ for(TaxiiOptions o : TaxiiOptions.values()) {
+ ret.addOption(o.option);
+ }
+ return ret;
+ }
+ }
+ public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ public static final long ONE_HR_IN_MS = 60*60*1000;
+ public static final long DEFAULT_TIME_BETWEEN_POLLS = ONE_HR_IN_MS;
+
+
+ public static void main(String... argv) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ String zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+ String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+ CommandLine cli = TaxiiOptions.parse(new PosixParser(), otherArgs);
+ if(TaxiiOptions.LOG4J_PROPERTIES.has(cli)) {
+ PropertyConfigurator.configure(TaxiiOptions.LOG4J_PROPERTIES.get(cli));
+ }
+ ExtractorHandler handler = ExtractorHandler.load(FileUtils.readFileToString(new File(TaxiiOptions.EXTRACTOR_CONFIG.get(cli))));
+ Extractor e = handler.getExtractor();
+ EnrichmentConfig enrichmentConfig = null;
+ if(TaxiiOptions.ENRICHMENT_CONFIG.has(cli)) {
+ enrichmentConfig = JSONUtils.INSTANCE.load( new File(TaxiiOptions.ENRICHMENT_CONFIG.get(cli))
+ , EnrichmentConfig.class
+ );
+ enrichmentConfig.updateSensorConfigs();
+ }
+
+ Timer timer = new Timer();
+ if(e instanceof StixExtractor) {
+ StixExtractor extractor = (StixExtractor)e;
+ TaxiiConnectionConfig connectionConfig = TaxiiConnectionConfig.load(FileUtils.readFileToString(new File(TaxiiOptions.CONNECTION_CONFIG.get(cli))));
+ if(TaxiiOptions.BEGIN_TIME.has(cli)) {
+ Date d = DATE_FORMAT.parse(TaxiiOptions.BEGIN_TIME.get(cli));
+ connectionConfig.withBeginTime(d);
+ }
+ long timeBetween = DEFAULT_TIME_BETWEEN_POLLS;
+ if(TaxiiOptions.TIME_BETWEEN_POLLS.has(cli)) {
+ timeBetween = Long.parseLong(TaxiiOptions.TIME_BETWEEN_POLLS.get(cli));
+ }
+ timer.scheduleAtFixedRate(new TaxiiHandler(connectionConfig, extractor, conf), 0, timeBetween);
+ }
+ else {
+ throw new IllegalStateException("Extractor must be a STIX Extractor");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java
deleted file mode 100644
index 1e95507..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.dataloads.taxii;
-
-public enum ConnectionType {
- POLL, DISCOVER;
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java
deleted file mode 100644
index ddf542e..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.dataloads.taxii;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.hbase.client.HTableInterface;
-
-public class TableInfo {
- private String tableName;
- private String columnFamily;
- public TableInfo(String s) {
- Iterable<String> i = Splitter.on(":").split(s);
- if(Iterables.size(i) != 2) {
- throw new IllegalStateException("Malformed table:cf => " + s);
- }
- tableName = Iterables.getFirst(i, null);
- columnFamily = Iterables.getLast(i);
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public String getColumnFamily() {
- return columnFamily;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TableInfo tableInfo = (TableInfo) o;
-
- if (getTableName() != null ? !getTableName().equals(tableInfo.getTableName()) : tableInfo.getTableName() != null)
- return false;
- return getColumnFamily() != null ? getColumnFamily().equals(tableInfo.getColumnFamily()) : tableInfo.getColumnFamily() == null;
-
- }
-
- @Override
- public int hashCode() {
- int result = getTableName() != null ? getTableName().hashCode() : 0;
- result = 31 * result + (getColumnFamily() != null ? getColumnFamily().hashCode() : 0);
- return result;
- }
-
- @Override
- public String toString() {
- return "TableInfo{" +
- "tableName='" + tableName + '\'' +
- ", columnFamily='" + columnFamily + '\'' +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java
deleted file mode 100644
index dab8f0c..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.dataloads.taxii;
-
-import org.codehaus.jackson.map.ObjectMapper;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.charset.Charset;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-public class TaxiiConnectionConfig {
- final static ObjectMapper _mapper = new ObjectMapper();
- private URL endpoint;
- private int port = 443;
- private URL proxy;
- private String username;
- private String password;
- private ConnectionType type;
- private String collection = "default";
- private String subscriptionId = null;
- private Date beginTime;
- private Map<String, TableInfo> tableMap;
- public TaxiiConnectionConfig withTableMap(Map<String, TableInfo> tableMap) {
- this.tableMap = tableMap;
- return this;
- }
- public TaxiiConnectionConfig withBeginTime(Date time) {
- this.beginTime = time;
- return this;
- }
- public TaxiiConnectionConfig withSubscriptionId(String subId) {
- this.subscriptionId = subId;
- return this;
- }
- public TaxiiConnectionConfig withCollection(String collection) {
- this.collection = collection;
- return this;
- }
-
- public TaxiiConnectionConfig withPort(int port) {
- this.port = port;
- return this;
- }
- public TaxiiConnectionConfig withEndpoint(URL endpoint) {
- this.endpoint = endpoint;
- return this;
- }
- public TaxiiConnectionConfig withProxy(URL proxy) {
- this.proxy = proxy;
- return this;
- }
- public TaxiiConnectionConfig withUsername(String username) {
- this.username = username;
- return this;
- }
- public TaxiiConnectionConfig withPassword(String password) {
- this.password = password;
- return this;
- }
- public TaxiiConnectionConfig withConnectionType(ConnectionType type) {
- this.type= type;
- return this;
- }
-
- public void setEndpoint(String endpoint) throws MalformedURLException {
- this.endpoint = new URL(endpoint);
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- public void setProxy(String proxy) throws MalformedURLException {
- this.proxy = new URL(proxy);
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public void setType(ConnectionType type) {
- this.type = type;
- }
-
- public void setCollection(String collection) {
- this.collection = collection;
- }
-
- public void setSubscriptionId(String subscriptionId) {
- this.subscriptionId = subscriptionId;
- }
-
- public void setBeginTime(String beginTime) throws ParseException {
- SimpleDateFormat sdf = (SimpleDateFormat)DateFormat.getDateInstance(DateFormat.MEDIUM);
- this.beginTime = sdf.parse(beginTime);
- }
-
- public void setTableMap(Map<String, String> tableMap) {
- this.tableMap = new HashMap<>();
- for(Map.Entry<String, String> kv : tableMap.entrySet()) {
- this.tableMap.put(kv.getKey(), new TableInfo(kv.getValue()));
- }
- }
-
- public Map<String, TableInfo> getTableMap() {
- return tableMap;
- }
-
- public Date getBeginTime() {
- return beginTime;
- }
- public int getPort() {
- return port;
- }
- public URL getEndpoint() {
- return endpoint;
- }
-
- public URL getProxy() {
- return proxy;
- }
-
- public String getUsername() {
- return username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public ConnectionType getType() {
- return type;
- }
-
- public String getCollection() {
- return collection;
- }
- public String getSubscriptionId() {
- return subscriptionId;
- }
- public static synchronized TaxiiConnectionConfig load(InputStream is) throws IOException {
- TaxiiConnectionConfig ret = _mapper.readValue(is, TaxiiConnectionConfig.class);
- return ret;
- }
- public static synchronized TaxiiConnectionConfig load(String s, Charset c) throws IOException {
- return load( new ByteArrayInputStream(s.getBytes(c)));
- }
- public static synchronized TaxiiConnectionConfig load(String s) throws IOException {
- return load( s, Charset.defaultCharset());
- }
-
- @Override
- public String toString() {
- return "TaxiiConnectionConfig{" +
- "endpoint=" + endpoint +
- ", port=" + port +
- ", proxy=" + proxy +
- ", username='" + username + '\'' +
- ", password=" + (password == null?"null" : "'******'") +
- ", type=" + type +
- ", collection='" + collection + '\'' +
- ", subscriptionId='" + subscriptionId + '\'' +
- ", beginTime=" + beginTime +
- ", tableMap=" + tableMap +
- '}';
- }
-}