You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:05:52 UTC
[10/43] incubator-metron git commit: METRON-50: Ingest threat intel
data from Taxii feeds closes apache/incubator-metron#29
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
new file mode 100644
index 0000000..8f1f205
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.hbase.mr.PrunerMapper;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class LeastRecentlyUsedPruner {
+ private static abstract class OptionHandler implements Function<String, Option> {}
+ private enum BulkLoadOptions {
+ HELP("h", new OptionHandler() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ return new Option(s, "help", false, "Generate Help screen");
+ }
+ }), TABLE("t", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "table", true, "HBase table to prune");
+ o.setRequired(true);
+ o.setArgName("HBASE_TABLE");
+ return o;
+ }
+ }), COLUMN_FAMILY("f", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "column_family", true, "Column family of the HBase table to prune");
+ o.setRequired(false);
+ o.setArgName("CF_NAME");
+ return o;
+ }
+ })
+ ,AS_OF_TIME("a", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "as_of", true, "The earliest access tracker you want to use.");
+ o.setArgName("datetime");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ String defaultFormat = new SimpleDateFormat().toLocalizedPattern();
+ Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option) (Default is: " + defaultFormat + ")");
+ o.setArgName("format");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,ACCESS_TABLE("u", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "access_table", true, "HBase table containing the access trackers.");
+ o.setRequired(true);
+ o.setArgName("HBASE_TABLE");
+ return o;
+ }
+ }), ACCESS_COLUMN_FAMILY("z", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "access_column_family", true, "Column family of the HBase table containing the access trackers");
+ o.setRequired(true);
+ o.setArgName("CF_NAME");
+ return o;
+ }
+ });
+ Option option;
+ String shortCode;
+ BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+ this.shortCode = shortCode;
+ this.option = optionHandler.apply(shortCode);
+ }
+
+ public boolean has(CommandLine cli) {
+ return cli.hasOption(shortCode);
+ }
+
+ public String get(CommandLine cli) {
+ return cli.getOptionValue(shortCode);
+ }
+ private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+ Date d = getFormat(cli).parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+ return d.getTime();
+ }
+
+ private static DateFormat getFormat(CommandLine cli) {
+ DateFormat format = new SimpleDateFormat();
+ if (BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+ format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+ }
+ return format;
+ }
+
+ public static CommandLine parse(CommandLineParser parser, String[] args) {
+ try {
+ CommandLine cli = parser.parse(getOptions(), args);
+ if(BulkLoadOptions.HELP.has(cli)) {
+ printHelp();
+ System.exit(0);
+ }
+ return cli;
+ } catch (ParseException e) {
+ System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+ e.printStackTrace(System.err);
+ printHelp();
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ public static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
+ }
+
+ public static Options getOptions() {
+ Options ret = new Options();
+ for(BulkLoadOptions o : BulkLoadOptions.values()) {
+ ret.addOption(o.option);
+ }
+ return ret;
+ }
+ }
+
+ public static void setupHBaseJob(Job job, String sourceTable, String cf) throws IOException {
+ Scan scan = new Scan();
+ if(cf != null) {
+ scan.addFamily(Bytes.toBytes(cf));
+ }
+ scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+ scan.setCacheBlocks(false); // don't set to true for MR jobs
+// set other scan attrs
+
+ TableMapReduceUtil.initTableMapperJob(
+ sourceTable, // input table
+ scan, // Scan instance to control CF and attribute selection
+ PrunerMapper.class, // mapper class
+ null, // mapper output key
+ null, // mapper output value
+ job);
+ TableMapReduceUtil.initTableReducerJob(
+ sourceTable, // output table
+ null, // reducer class
+ job);
+ }
+
+ public static Job createJob( Configuration conf
+ , String table
+ , String cf
+ , String accessTrackerTable
+ , String accessTrackerColumnFamily
+ , Long ts
+ ) throws IOException
+ {
+ Job job = new Job(conf);
+ job.setJobName("LeastRecentlyUsedPruner: Pruning " + table + ":" + cf + " since " + new SimpleDateFormat().format(new Date(ts)));
+ System.out.println("Configuring " + job.getJobName());
+ job.setJarByClass(LeastRecentlyUsedPruner.class);
+ job.getConfiguration().setLong(PrunerMapper.TIMESTAMP_CONF, ts);
+ job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_NAME_CONF, table);
+ job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_CF_CONF, accessTrackerColumnFamily);
+ job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_TABLE_CONF, accessTrackerTable);
+ setupHBaseJob(job, table, cf);
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+ Configuration conf = HBaseConfiguration.create();
+ String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+ CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+ Long ts = BulkLoadOptions.getTimestamp(cli);
+ String table = BulkLoadOptions.TABLE.get(cli);
+ String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+ String accessTrackerTable = BulkLoadOptions.ACCESS_TABLE.get(cli);
+ String accessTrackerCF = BulkLoadOptions.ACCESS_COLUMN_FAMILY.get(cli);
+ Job job = createJob(conf, table, cf, accessTrackerTable, accessTrackerCF, ts);
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
new file mode 100644
index 0000000..7d7ef98
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.io.Files;
+import org.apache.commons.cli.*;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
+import org.apache.metron.hbase.converters.HbaseConverter;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.text.*;
+import java.util.Date;
+
+public class ThreatIntelBulkLoader {
+ private static abstract class OptionHandler implements Function<String, Option> {}
+ private enum BulkLoadOptions {
+ HELP("h", new OptionHandler() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ return new Option(s, "help", false, "Generate Help screen");
+ }
+ })
+ ,TABLE("t", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "table", true, "HBase table to import data into");
+ o.setRequired(true);
+ o.setArgName("HBASE_TABLE");
+ return o;
+ }
+ })
+ ,COLUMN_FAMILY("f", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "column_family", true, "Column family of the HBase table to import into");
+ o.setRequired(true);
+ o.setArgName("CF_NAME");
+ return o;
+ }
+ })
+ ,EXTRACTOR_CONFIG("e", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+ o.setArgName("JSON_FILE");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,INPUT_DATA("i", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "input", true, "Input directory in HDFS for the data to import into HBase");
+ o.setArgName("DIR");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,AS_OF_TIME("a", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "as_of", true, "The last read timestamp to mark the records with (omit for time of execution)");
+ o.setArgName("datetime");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option)");
+ o.setArgName("format");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,CONVERTER("c", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "converter", true, "The HBase converter class to use (Default is threat intel)");
+ o.setArgName("class");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ;
+ Option option;
+ String shortCode;
+ BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+ this.shortCode = shortCode;
+ this.option = optionHandler.apply(shortCode);
+ }
+
+ public boolean has(CommandLine cli) {
+ return cli.hasOption(shortCode);
+ }
+
+ public String get(CommandLine cli) {
+ return cli.getOptionValue(shortCode);
+ }
+
+ public static CommandLine parse(CommandLineParser parser, String[] args) {
+ try {
+ CommandLine cli = parser.parse(getOptions(), args);
+ if(ThreatIntelBulkLoader.BulkLoadOptions.HELP.has(cli)) {
+ printHelp();
+ System.exit(0);
+ }
+ return cli;
+ } catch (ParseException e) {
+ System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+ e.printStackTrace(System.err);
+ printHelp();
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ public static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
+ }
+
+ public static Options getOptions() {
+ Options ret = new Options();
+ for(BulkLoadOptions o : BulkLoadOptions.values()) {
+ ret.addOption(o.option);
+ }
+ return ret;
+ }
+ }
+ private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+ if(BulkLoadOptions.AS_OF_TIME.has(cli)) {
+ if(!BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+ throw new IllegalStateException("Unable to proceed: Specified as_of_time without an associated format.");
+ }
+ else {
+ DateFormat format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+ Date d = format.parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+ return d.getTime();
+ }
+ }
+ else {
+ return System.currentTimeMillis();
+ }
+ }
+ private static String readExtractorConfig(File configFile) throws IOException {
+ return Joiner.on("\n").join(Files.readLines(configFile, Charset.defaultCharset()));
+ }
+
+ public static Job createJob(Configuration conf, String input, String table, String cf, String extractorConfigContents, long ts, HbaseConverter converter) throws IOException {
+ Job job = new Job(conf);
+ job.setJobName("ThreatIntelBulkLoader: " + input + " => " + table + ":" + cf);
+ System.out.println("Configuring " + job.getJobName());
+ job.setJarByClass(ThreatIntelBulkLoader.class);
+ job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
+ job.setOutputFormatClass(TableOutputFormat.class);
+ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
+ job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
+ job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
+ job.getConfiguration().set(BulkLoadMapper.LAST_SEEN_KEY, "" + ts);
+ job.getConfiguration().set(BulkLoadMapper.CONVERTER_KEY, converter.getClass().getName());
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Put.class);
+ job.setNumReduceTasks(0);
+ ExtractorHandler handler = ExtractorHandler.load(extractorConfigContents);
+ handler.getInputFormatHandler().set(job, new Path(input), handler.getConfig());
+ return job;
+ }
+
+ public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException, IllegalAccessException, InstantiationException {
+ Configuration conf = HBaseConfiguration.create();
+ String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+ CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+ Long ts = getTimestamp(cli);
+ String input = BulkLoadOptions.INPUT_DATA.get(cli);
+ String table = BulkLoadOptions.TABLE.get(cli);
+ String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+ String extractorConfigContents = readExtractorConfig(new File(BulkLoadOptions.EXTRACTOR_CONFIG.get(cli)));
+ String converterClass = ThreatIntelConverter.class.getName();
+ if(BulkLoadOptions.CONVERTER.has(cli)) {
+ converterClass = BulkLoadOptions.CONVERTER.get(cli);
+ }
+ HbaseConverter converter = (HbaseConverter) Class.forName(converterClass).newInstance();
+ Job job = createJob(conf, input, table, cf, extractorConfigContents, ts, converter);
+ System.out.println(conf);
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
index 41f667b..30e56d8 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
@@ -17,12 +17,14 @@
*/
package org.apache.metron.dataloads.extractor;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.reference.lookup.LookupKV;
import org.apache.metron.threatintel.ThreatIntelResults;
import java.io.IOException;
import java.util.Map;
public interface Extractor {
- Iterable<ThreatIntelResults> extract(String line) throws IOException;
+ Iterable<LookupKV> extract(String line) throws IOException;
void initialize(Map<String, Object> config);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
index f5ec434..94e9ccb 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
@@ -21,7 +21,12 @@ import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.opencsv.CSVParser;
import com.opencsv.CSVParserBuilder;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.hbase.converters.HbaseConverter;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupValue;
import org.apache.metron.threatintel.ThreatIntelResults;
import java.io.IOException;
@@ -31,24 +36,26 @@ public class CSVExtractor implements Extractor {
public static final String COLUMNS_KEY="columns";
public static final String INDICATOR_COLUMN_KEY="indicator_column";
public static final String SEPARATOR_KEY="separator";
+ public static final String LOOKUP_CONVERTER = "lookupConverter";
private int indicatorColumn;
private Map<String, Integer> columnMap = new HashMap<>();
private CSVParser parser;
+ private LookupConverter converter = LookupConverters.THREAT_INTEL.getConverter();
@Override
- public Iterable<ThreatIntelResults> extract(String line) throws IOException {
+ public Iterable<LookupKV> extract(String line) throws IOException {
if(line.trim().startsWith("#")) {
//comment
return Collections.emptyList();
}
- ThreatIntelResults ret = new ThreatIntelResults();
String[] tokens = parser.parseLine(line);
- ret.getKey().indicator = tokens[indicatorColumn];
+ LookupKey key = converter.toKey(tokens[indicatorColumn]);
+ Map<String, String> values = new HashMap<>();
for(Map.Entry<String, Integer> kv : columnMap.entrySet()) {
- ret.getValue().put(kv.getKey(), tokens[kv.getValue()]);
+ values.put(kv.getKey(), tokens[kv.getValue()]);
}
- return Arrays.asList(ret);
+ return Arrays.asList(new LookupKV(key, converter.toValue(values)));
}
private static Map.Entry<String, Integer> getColumnMapEntry(String column, int i) {
@@ -109,5 +116,8 @@ public class CSVExtractor implements Extractor {
parser = new CSVParserBuilder().withSeparator(separator)
.build();
}
+ if(config.containsKey(LOOKUP_CONVERTER)) {
+ converter = LookupConverters.getConverter((String) config.get(LOOKUP_CONVERTER));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
new file mode 100644
index 0000000..9e9b79f
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverter.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.extractor.csv;
+
+import org.apache.metron.hbase.converters.HbaseConverter;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupValue;
+
+import java.util.Map;
+
+public interface LookupConverter {
+ LookupKey toKey(String indicator);
+ LookupValue toValue(Map<String, String> metadata);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
new file mode 100644
index 0000000..7f9218a
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/csv/LookupConverters.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.extractor.csv;
+
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupValue;
+
+import java.util.Map;
+
+public enum LookupConverters {
+
+ THREAT_INTEL(new LookupConverter() {
+ @Override
+ public LookupKey toKey(String indicator) {
+ return new ThreatIntelKey(indicator);
+ }
+
+ @Override
+ public LookupValue toValue(Map<String, String> metadata) {
+ return new ThreatIntelValue(metadata);
+ }
+ })
+ ;
+ LookupConverter converter;
+ LookupConverters(LookupConverter converter) {
+ this.converter = converter;
+ }
+ public LookupConverter getConverter() {
+ return converter;
+ }
+
+ public static LookupConverter getConverter(String name) {
+ try {
+ return LookupConverters.valueOf(name).getConverter();
+ }
+ catch(Throwable t) {
+ try {
+ return (LookupConverter) Class.forName(name).newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("Unable to parse " + name, e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Unable to parse " + name, e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to parse " + name, e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
index 039092f..b3829b4 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/StixExtractor.java
@@ -23,9 +23,12 @@ import org.apache.commons.io.FileUtils;
import org.apache.metron.dataloads.extractor.Extractor;
import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandler;
import org.apache.metron.dataloads.extractor.stix.types.ObjectTypeHandlers;
+import org.apache.metron.reference.lookup.LookupKV;
import org.apache.metron.threatintel.ThreatIntelResults;
import org.mitre.cybox.common_2.*;
import org.mitre.cybox.cybox_2.ObjectType;
+import org.mitre.cybox.cybox_2.Observable;
+import org.mitre.cybox.cybox_2.Observables;
import org.mitre.stix.common_1.IndicatorBaseType;
import org.mitre.stix.indicator_2.Indicator;
import org.mitre.stix.stix_1.STIXPackage;
@@ -39,9 +42,35 @@ import java.util.Map;
public class StixExtractor implements Extractor {
Map<String, Object> config;
@Override
- public Iterable<ThreatIntelResults> extract(String line) throws IOException {
- STIXPackage stixPackage = STIXPackage.fromXMLString(line);
- List<ThreatIntelResults> ret = new ArrayList<>();
+ public Iterable<LookupKV> extract(String line) throws IOException {
+ STIXPackage stixPackage = STIXPackage.fromXMLString(line.replaceAll("\"Equal\"", "\"Equals\""));
+ List<LookupKV> ret = new ArrayList<>();
+ for(Observable o : getObservables(stixPackage)) {
+ ObjectType obj = o.getObject();
+ if(obj != null) {
+ ObjectPropertiesType props = obj.getProperties();
+ if(props != null) {
+ ObjectTypeHandler handler = ObjectTypeHandlers.getHandlerByInstance(props);
+ if (handler != null) {
+ Iterable<LookupKV> extractions = handler.extract(props, config);
+ for(LookupKV extraction : extractions) {
+ ret.add(extraction);
+ }
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ public List<Observable> getObservables(STIXPackage stixPackage) {
+ List<Observable> ret = new ArrayList<>();
+ Observables observables = stixPackage.getObservables();
+ if(observables != null) {
+ for (Observable o : observables.getObservables()) {
+ ret.add(o);
+ }
+ }
if (stixPackage.getIndicators() != null) {
if (stixPackage.getIndicators().getIndicators() != null) {
List<IndicatorBaseType> indicators = stixPackage.getIndicators().getIndicators();
@@ -49,12 +78,7 @@ public class StixExtractor implements Extractor {
for (int i = 0; i < indicatorCount; i++) {
Indicator indicator = (Indicator) indicators.get(i);
if (indicator.getObservable() != null) {
- ObjectType obj = indicator.getObservable().getObject();
- ObjectPropertiesType props = obj.getProperties();
- ObjectTypeHandler handler = ObjectTypeHandlers.getHandlerByInstance(props);
- if(handler != null) {
- Iterables.addAll(ret, handler.extract(props, config));
- }
+ ret.add(indicator.getObservable());
}
}
}
@@ -102,7 +126,7 @@ public class StixExtractor implements Extractor {
String line = FileUtils.readFileToString(file);
StixExtractor extractor = new StixExtractor();
- for(ThreatIntelResults results : extractor.extract(line)) {
+ for(LookupKV results : extractor.extract(line)) {
System.out.println(results);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
index ee4ff78..638a9ce 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/AddressHandler.java
@@ -19,10 +19,10 @@ package org.apache.metron.dataloads.extractor.stix.types;
import com.google.common.base.Splitter;
import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.reference.lookup.LookupKV;
import org.apache.metron.threatintel.ThreatIntelResults;
-import org.mitre.cybox.common_2.ConditionApplicationEnum;
-import org.mitre.cybox.common_2.ConditionTypeEnum;
import org.mitre.cybox.common_2.StringObjectPropertyType;
import org.mitre.cybox.objects.Address;
import org.mitre.cybox.objects.CategoryTypeEnum;
@@ -42,8 +42,8 @@ public class AddressHandler extends AbstractObjectTypeHandler<Address> {
}
@Override
- public Iterable<ThreatIntelResults> extract(final Address type, Map<String, Object> config) throws IOException {
- List<ThreatIntelResults> ret = new ArrayList<>();
+ public Iterable<LookupKV> extract(final Address type, Map<String, Object> config) throws IOException {
+ List<LookupKV> ret = new ArrayList<>();
final CategoryTypeEnum category= type.getCategory();
if(!SUPPORTED_CATEGORIES.contains(category)) {
return ret;
@@ -61,13 +61,15 @@ public class AddressHandler extends AbstractObjectTypeHandler<Address> {
}
StringObjectPropertyType value = type.getAddressValue();
for(String token : StixExtractor.split(value)) {
- ThreatIntelResults results = new ThreatIntelResults(new ThreatIntelKey(token),
+ LookupKV results = new LookupKV(new ThreatIntelKey(token)
+ , new ThreatIntelValue(
new HashMap<String, String>() {{
put("source-type", "STIX");
- put("indicator-type", "Address");
+ put("indicator-type", type.getClass().getSimpleName() + ":" + category);
put("source", type.toXMLString());
}}
- );
+ )
+ );
ret.add(results);
}
return ret;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
index e0444fb..233550b 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/DomainHandler.java
@@ -18,7 +18,9 @@
package org.apache.metron.dataloads.extractor.stix.types;
import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.reference.lookup.LookupKV;
import org.apache.metron.threatintel.ThreatIntelResults;
import org.mitre.cybox.common_2.StringObjectPropertyType;
import org.mitre.cybox.objects.DomainName;
@@ -34,19 +36,21 @@ public class DomainHandler extends AbstractObjectTypeHandler<DomainName> {
}
@Override
- public Iterable<ThreatIntelResults> extract(final DomainName type, Map<String, Object> config) throws IOException {
- List<ThreatIntelResults> ret = new ArrayList<>();
+ public Iterable<LookupKV> extract(final DomainName type, Map<String, Object> config) throws IOException {
+ List<LookupKV> ret = new ArrayList<>();
final DomainNameTypeEnum domainType = type.getType();
- if(SUPPORTED_TYPES.contains(domainType)) {
+ if(domainType == null || SUPPORTED_TYPES.contains(domainType)) {
StringObjectPropertyType value = type.getValue();
for (String token : StixExtractor.split(value)) {
- ThreatIntelResults results = new ThreatIntelResults(new ThreatIntelKey(token),
- new HashMap<String, String>() {{
- put("source-type", "STIX");
- put("indicator-type", "DomainName");
- put("source", type.toXMLString());
- }}
- );
+ LookupKV results = new LookupKV(new ThreatIntelKey(token)
+ , new ThreatIntelValue(
+ new HashMap<String, String>() {{
+ put("source-type", "STIX");
+ put("indicator-type", type.getClass().getSimpleName() + ":" + DomainNameTypeEnum.FQDN);
+ put("source", type.toXMLString());
+ }}
+ )
+ );
ret.add(results);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
index d0c55a9..702c440 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/HostnameHandler.java
@@ -19,7 +19,9 @@
package org.apache.metron.dataloads.extractor.stix.types;
import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.reference.lookup.LookupKV;
import org.apache.metron.threatintel.ThreatIntelResults;
import org.mitre.cybox.common_2.StringObjectPropertyType;
import org.mitre.cybox.objects.Hostname;
@@ -36,17 +38,18 @@ public class HostnameHandler extends AbstractObjectTypeHandler<Hostname>{
}
@Override
- public Iterable<ThreatIntelResults> extract(final Hostname type, Map<String, Object> config) throws IOException {
+ public Iterable<LookupKV> extract(final Hostname type, Map<String, Object> config) throws IOException {
StringObjectPropertyType value = type.getHostnameValue();
- List<ThreatIntelResults> ret = new ArrayList<>();
+ List<LookupKV> ret = new ArrayList<>();
for(String token : StixExtractor.split(value)) {
- ThreatIntelResults results = new ThreatIntelResults(new ThreatIntelKey(token),
- new HashMap<String, String>() {{
+ LookupKV results = new LookupKV(new ThreatIntelKey(token)
+ , new ThreatIntelValue(new HashMap<String, String>() {{
put("source-type", "STIX");
- put("indicator-type", "Hostname");
+ put("indicator-type", type.getClass().getSimpleName());
put("source", type.toXMLString());
}}
- );
+ )
+ );
ret.add(results);
}
return ret;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
index f639be0..e5a5296 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/extractor/stix/types/ObjectTypeHandler.java
@@ -18,6 +18,7 @@
package org.apache.metron.dataloads.extractor.stix.types;
import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.reference.lookup.LookupKV;
import org.apache.metron.threatintel.ThreatIntelResults;
import org.mitre.cybox.common_2.ObjectPropertiesType;
@@ -25,6 +26,6 @@ import java.io.IOException;
import java.util.Map;
public interface ObjectTypeHandler<T extends ObjectPropertiesType> {
- Iterable<ThreatIntelResults> extract(T type, Map<String, Object> config) throws IOException;
+ Iterable<LookupKV> extract(T type, Map<String, Object> config) throws IOException;
Class<T> getTypeClass();
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
index 0ad09e6..04714d9 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapper.java
@@ -24,8 +24,10 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.metron.dataloads.extractor.Extractor;
import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.hbase.converters.HbaseConverter;
+import org.apache.metron.reference.lookup.LookupKV;
import org.apache.metron.threatintel.ThreatIntelResults;
-import org.apache.metron.threatintel.hbase.Converter;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
import java.io.IOException;
@@ -34,9 +36,11 @@ public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable,
public static final String CONFIG_KEY="bl_extractor_config";
public static final String COLUMN_FAMILY_KEY = "bl_column_family";
public static final String LAST_SEEN_KEY = "bl_last_seen";
+ public static final String CONVERTER_KEY = "bl_converter";
Extractor extractor = null;
String columnFamily = null;
Long lastSeen = null;
+ HbaseConverter converter;
@Override
public void setup(Context context) throws IOException,
InterruptedException {
@@ -45,19 +49,28 @@ public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable,
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- for(ThreatIntelResults results : extractor.extract(value.toString())) {
+ for(LookupKV results : extractor.extract(value.toString())) {
if (results != null) {
- Put put = Converter.INSTANCE.toPut(columnFamily, results.getKey(), results.getValue(), lastSeen);
+ Put put = converter.toPut(columnFamily, results.getKey(), results.getValue());
write(new ImmutableBytesWritable(results.getKey().toBytes()), put, context);
}
}
}
- protected void initialize(Configuration configuration) throws IOException {
+ protected void initialize(Configuration configuration) throws IOException{
String configStr = configuration.get(CONFIG_KEY);
extractor = ExtractorHandler.load(configStr).getExtractor();
columnFamily = configuration.get(COLUMN_FAMILY_KEY);
lastSeen = Long.parseLong(configuration.get(LAST_SEEN_KEY));
+ try {
+ converter = (HbaseConverter) Class.forName(configuration.get(CONVERTER_KEY)).newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("Unable to create converter object: " + configuration.get(CONVERTER_KEY), e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Unable to create converter object: " + configuration.get(CONVERTER_KEY), e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to create converter object: " + configuration.get(CONVERTER_KEY), e);
+ }
}
protected void write(ImmutableBytesWritable key, Put value, Context context) throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
index 3e19b07..bf33eed 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
@@ -70,6 +70,11 @@ public class PrunerMapper extends TableMapper<ImmutableBytesWritable, Delete> {
public byte[] toBytes() {
return bytes;
}
+
+ @Override
+ public void fromBytes(byte[] in) {
+
+ }
};
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java
new file mode 100644
index 0000000..1e95507
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/ConnectionType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.taxii;
+
+public enum ConnectionType {
+ POLL, DISCOVER;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java
new file mode 100644
index 0000000..ddf542e
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TableInfo.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.taxii;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+public class TableInfo {
+ private String tableName;
+ private String columnFamily;
+ public TableInfo(String s) {
+ Iterable<String> i = Splitter.on(":").split(s);
+ if(Iterables.size(i) != 2) {
+ throw new IllegalStateException("Malformed table:cf => " + s);
+ }
+ tableName = Iterables.getFirst(i, null);
+ columnFamily = Iterables.getLast(i);
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TableInfo tableInfo = (TableInfo) o;
+
+ if (getTableName() != null ? !getTableName().equals(tableInfo.getTableName()) : tableInfo.getTableName() != null)
+ return false;
+ return getColumnFamily() != null ? getColumnFamily().equals(tableInfo.getColumnFamily()) : tableInfo.getColumnFamily() == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getTableName() != null ? getTableName().hashCode() : 0;
+ result = 31 * result + (getColumnFamily() != null ? getColumnFamily().hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TableInfo{" +
+ "tableName='" + tableName + '\'' +
+ ", columnFamily='" + columnFamily + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java
new file mode 100644
index 0000000..dab8f0c
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiConnectionConfig.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.taxii;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TaxiiConnectionConfig {
+ final static ObjectMapper _mapper = new ObjectMapper();
+ private URL endpoint;
+ private int port = 443;
+ private URL proxy;
+ private String username;
+ private String password;
+ private ConnectionType type;
+ private String collection = "default";
+ private String subscriptionId = null;
+ private Date beginTime;
+ private Map<String, TableInfo> tableMap;
+ public TaxiiConnectionConfig withTableMap(Map<String, TableInfo> tableMap) {
+ this.tableMap = tableMap;
+ return this;
+ }
+ public TaxiiConnectionConfig withBeginTime(Date time) {
+ this.beginTime = time;
+ return this;
+ }
+ public TaxiiConnectionConfig withSubscriptionId(String subId) {
+ this.subscriptionId = subId;
+ return this;
+ }
+ public TaxiiConnectionConfig withCollection(String collection) {
+ this.collection = collection;
+ return this;
+ }
+
+ public TaxiiConnectionConfig withPort(int port) {
+ this.port = port;
+ return this;
+ }
+ public TaxiiConnectionConfig withEndpoint(URL endpoint) {
+ this.endpoint = endpoint;
+ return this;
+ }
+ public TaxiiConnectionConfig withProxy(URL proxy) {
+ this.proxy = proxy;
+ return this;
+ }
+ public TaxiiConnectionConfig withUsername(String username) {
+ this.username = username;
+ return this;
+ }
+ public TaxiiConnectionConfig withPassword(String password) {
+ this.password = password;
+ return this;
+ }
+ public TaxiiConnectionConfig withConnectionType(ConnectionType type) {
+ this.type= type;
+ return this;
+ }
+
+ public void setEndpoint(String endpoint) throws MalformedURLException {
+ this.endpoint = new URL(endpoint);
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public void setProxy(String proxy) throws MalformedURLException {
+ this.proxy = new URL(proxy);
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public void setType(ConnectionType type) {
+ this.type = type;
+ }
+
+ public void setCollection(String collection) {
+ this.collection = collection;
+ }
+
+ public void setSubscriptionId(String subscriptionId) {
+ this.subscriptionId = subscriptionId;
+ }
+
+ public void setBeginTime(String beginTime) throws ParseException {
+ SimpleDateFormat sdf = (SimpleDateFormat)DateFormat.getDateInstance(DateFormat.MEDIUM);
+ this.beginTime = sdf.parse(beginTime);
+ }
+
+ public void setTableMap(Map<String, String> tableMap) {
+ this.tableMap = new HashMap<>();
+ for(Map.Entry<String, String> kv : tableMap.entrySet()) {
+ this.tableMap.put(kv.getKey(), new TableInfo(kv.getValue()));
+ }
+ }
+
+ public Map<String, TableInfo> getTableMap() {
+ return tableMap;
+ }
+
+ public Date getBeginTime() {
+ return beginTime;
+ }
+ public int getPort() {
+ return port;
+ }
+ public URL getEndpoint() {
+ return endpoint;
+ }
+
+ public URL getProxy() {
+ return proxy;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public ConnectionType getType() {
+ return type;
+ }
+
+ public String getCollection() {
+ return collection;
+ }
+ public String getSubscriptionId() {
+ return subscriptionId;
+ }
+ public static synchronized TaxiiConnectionConfig load(InputStream is) throws IOException {
+ TaxiiConnectionConfig ret = _mapper.readValue(is, TaxiiConnectionConfig.class);
+ return ret;
+ }
+ public static synchronized TaxiiConnectionConfig load(String s, Charset c) throws IOException {
+ return load( new ByteArrayInputStream(s.getBytes(c)));
+ }
+ public static synchronized TaxiiConnectionConfig load(String s) throws IOException {
+ return load( s, Charset.defaultCharset());
+ }
+
+ @Override
+ public String toString() {
+ return "TaxiiConnectionConfig{" +
+ "endpoint=" + endpoint +
+ ", port=" + port +
+ ", proxy=" + proxy +
+ ", username='" + username + '\'' +
+ ", password=" + (password == null?"null" : "'******'") +
+ ", type=" + type +
+ ", collection='" + collection + '\'' +
+ ", subscriptionId='" + subscriptionId + '\'' +
+ ", beginTime=" + beginTime +
+ ", tableMap=" + tableMap +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiHandler.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiHandler.java
new file mode 100644
index 0000000..614adec
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/taxii/TaxiiHandler.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.taxii;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.AuthCache;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLContextBuilder;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.client.BasicAuthCache;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.log4j.Logger;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.mitre.taxii.client.HttpClient;
+import org.mitre.taxii.messages.xml11.*;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class TaxiiHandler extends TimerTask {
+ private static final Logger LOG = Logger.getLogger(TaxiiHandler.class);
+
+ private static ThreadLocal<TaxiiXmlFactory> xmlFactory = new ThreadLocal<TaxiiXmlFactory>() {
+ @Override
+ protected TaxiiXmlFactory initialValue() {
+ return new TaxiiXmlFactory();
+ }
+ };
+ private static ThreadLocal<ObjectFactory> messageFactory = new ThreadLocal<ObjectFactory>() {
+ @Override
+ protected ObjectFactory initialValue() {
+ return new ObjectFactory();
+ }
+ };
+
+ private HttpClient taxiiClient;
+ private URL endpoint;
+ private Extractor extractor;
+ private Map<String, TableInfo> tableMap;
+ private Map<TableInfo, HTableInterface> connectionCache = new HashMap<>();
+ private HttpClientContext context;
+ private String collection;
+ private String subscriptionId;
+ private ThreatIntelConverter converter = new ThreatIntelConverter();
+ private Date beginTime;
+ private Configuration config;
+ private boolean inProgress = false;
+ public TaxiiHandler( TaxiiConnectionConfig connectionConfig
+ , Extractor extractor
+ , Configuration config
+ ) throws Exception
+ {
+ LOG.info("Loading configuration: " + connectionConfig);
+ this.extractor = extractor;
+ this.collection = connectionConfig.getCollection();
+ this.subscriptionId = connectionConfig.getSubscriptionId();
+ this.tableMap = connectionConfig.getTableMap();
+ this.beginTime = connectionConfig.getBeginTime();
+ this.config = config;
+ initializeClient(connectionConfig);
+ LOG.info("Configured, starting polling " + endpoint + " for " + collection);
+ }
+
+ protected synchronized HTableInterface getTable(TableInfo tableInfo) throws IOException {
+ HTableInterface ret = connectionCache.get(tableInfo);
+ if(ret == null) {
+ ret = createHTable(tableInfo);
+ connectionCache.put(tableInfo, ret);
+ }
+ return ret;
+ }
+
+ protected synchronized HTableInterface createHTable(TableInfo tableInfo) throws IOException {
+ return new HTable(config, tableInfo.getTableName());
+ }
+ /**
+ * The action to be performed by this timer task.
+ */
+ @Override
+ public void run() {
+ if(inProgress) {
+ return;
+ }
+ Date ts = new Date();
+ LOG.info("Polling..." + new SimpleDateFormat().format(ts));
+ try {
+ inProgress = true;
+ // Prepare the message to send.
+ String sessionID = MessageHelper.generateMessageId();
+ PollRequest request = messageFactory.get().createPollRequest()
+ .withMessageId(sessionID)
+ .withCollectionName(collection);
+ if (subscriptionId != null) {
+ request = request.withSubscriptionID(subscriptionId);
+ } else {
+ request = request.withPollParameters(messageFactory.get().createPollParametersType());
+ }
+ if (beginTime != null) {
+ Calendar gc = GregorianCalendar.getInstance();
+ gc.setTime(beginTime);
+ XMLGregorianCalendar gTime = null;
+ try {
+ gTime = DatatypeFactory.newInstance().newXMLGregorianCalendar((GregorianCalendar) gc).normalize();
+ } catch (DatatypeConfigurationException e) {
+ LOG.error("Unable to set the begin time", e);
+ }
+ gTime.setFractionalSecond(null);
+ LOG.info("Begin Time: " + gTime);
+ request.setExclusiveBeginTimestamp(gTime);
+ }
+
+ try {
+ PollResponse response = call(request, PollResponse.class);
+ LOG.info("Got Poll Response with " + response.getContentBlocks().size() + " blocks");
+ int numProcessed = 0;
+ long avgTimeMS = 0;
+ long timeStartedBlock = System.currentTimeMillis();
+ for (ContentBlock block : response.getContentBlocks()) {
+ AnyMixedContentType content = block.getContent();
+ for (Object o : content.getContent()) {
+ numProcessed++;
+ long timeS = System.currentTimeMillis();
+ String xml = null;
+ if (o instanceof Element) {
+ Element element = (Element) o;
+ xml = getStringFromDocument(element.getOwnerDocument());
+ if(LOG.isDebugEnabled() && Math.random() < 0.01) {
+ LOG.debug("Random Stix doc: " + xml);
+ }
+ for (LookupKV<ThreatIntelKey, ThreatIntelValue> kv : extractor.extract(xml)) {
+ String indicatorType = kv.getValue().getMetadata().get("indicator-type");
+ TableInfo tableInfo = tableMap.get(indicatorType);
+ boolean persisted = false;
+ if (tableInfo != null) {
+ kv.getValue().getMetadata().put("source_type", "taxii");
+ kv.getValue().getMetadata().put("taxii_url", endpoint.toString());
+ kv.getValue().getMetadata().put("taxii_collection", collection);
+ Put p = converter.toPut(tableInfo.getColumnFamily(), kv.getKey(), kv.getValue());
+ HTableInterface table = getTable(tableInfo);
+ table.put(p);
+ persisted = true;
+ }
+ LOG.info("Found Threat Intel: " + persisted + ", " + kv.getKey() + " => " + kv.getValue());
+ }
+ }
+ avgTimeMS += System.currentTimeMillis() - timeS;
+ }
+ if( (numProcessed + 1) % 100 == 0) {
+ LOG.info("Processed " + numProcessed + " in " + (System.currentTimeMillis() - timeStartedBlock) + " ms, avg time: " + avgTimeMS / content.getContent().size());
+ timeStartedBlock = System.currentTimeMillis();
+ avgTimeMS = 0;
+ numProcessed = 0;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException("Unable to make request", e);
+ }
+ }
+ finally {
+ inProgress = false;
+ beginTime = ts;
+ }
+ }
+ public String getStringFromDocument(Document doc)
+ {
+ try
+ {
+ DOMSource domSource = new DOMSource(doc);
+ StringWriter writer = new StringWriter();
+ StreamResult result = new StreamResult(writer);
+ TransformerFactory tf = TransformerFactory.newInstance();
+ Transformer transformer = tf.newTransformer();
+ transformer.transform(domSource, result);
+ return writer.toString();
+ }
+ catch(TransformerException ex)
+ {
+ ex.printStackTrace();
+ return null;
+ }
+ }
+ private <RESPONSE_T> RESPONSE_T call( Object request, Class<RESPONSE_T> responseClazz) throws URISyntaxException, JAXBException, IOException {
+ return call(taxiiClient, endpoint.toURI(), request, context, responseClazz);
+ }
+
+ private void initializeClient(TaxiiConnectionConfig config) throws Exception {
+ LOG.info("Initializing client..");
+ if(context == null) {
+ context = createContext(config.getEndpoint(), config.getUsername(), config.getPassword(), config.getPort());
+ }
+ URL endpoint = config.getEndpoint();
+ if(config.getType() == ConnectionType.DISCOVER) {
+ LOG.info("Discovering endpoint");
+ endpoint = discoverPollingClient(config.getProxy(), endpoint, config.getUsername(), config.getPassword(), context, collection).pollEndpoint;
+ this.endpoint = endpoint;
+ LOG.info("Discovered endpoint as " + endpoint);
+ }
+ taxiiClient = buildClient(config.getProxy(), config.getUsername(), config.getPassword());
+ }
+
+ private static class DiscoveryResults {
+ URL pollEndpoint;
+ URL collectionManagementEndpoint;
+ List<String> collections = new ArrayList<>();
+ }
+ private static DiscoveryResults discoverPollingClient(URL proxy, URL endpoint, String username, String password, HttpClientContext context, String defaultCollection) throws Exception {
+
+ DiscoveryResults results = new DiscoveryResults();
+ {
+ HttpClient discoverClient = buildClient(proxy, username, password);
+ String sessionID = MessageHelper.generateMessageId();
+ // Prepare the message to send.
+ DiscoveryRequest request = messageFactory.get().createDiscoveryRequest()
+ .withMessageId(sessionID);
+ DiscoveryResponse response = call(discoverClient, endpoint.toURI(), request, context, DiscoveryResponse.class);
+ for (ServiceInstanceType serviceInstance : response.getServiceInstances()) {
+ if (serviceInstance.isAvailable() && serviceInstance.getServiceType() == ServiceTypeEnum.POLL) {
+ results.pollEndpoint = new URL(serviceInstance.getAddress());
+ }
+ else if(serviceInstance.isAvailable() && serviceInstance.getServiceType() == ServiceTypeEnum.COLLECTION_MANAGEMENT) {
+ results.collectionManagementEndpoint= new URL(serviceInstance.getAddress());
+ }
+ }
+ if (results.pollEndpoint == null) {
+ throw new RuntimeException("Unable to discover a poll TAXII feed");
+ }
+ }
+ if(defaultCollection == null)
+ //get collections
+ {
+ HttpClient discoverClient = buildClient(proxy, username, password);
+ String sessionID = MessageHelper.generateMessageId();
+ CollectionInformationRequest request = messageFactory.get().createCollectionInformationRequest()
+ .withMessageId(sessionID);
+ CollectionInformationResponse response = call(discoverClient, results.collectionManagementEndpoint.toURI(), request, context, CollectionInformationResponse.class);
+ LOG.info("Unable to find the default collection; available collections are:");
+ for(CollectionRecordType c : response.getCollections()) {
+ LOG.info(c.getCollectionName());
+ results.collections.add(c.getCollectionName());
+ }
+ System.exit(0);
+ }
+ return results;
+ }
+
+ private static HttpClientContext createContext(URL endpoint, String username, String password, int port) {
+ HttpClientContext context = null;
+ HttpHost target = new HttpHost(endpoint.getHost(), port, endpoint.getProtocol());
+ if (username != null && password != null) {
+
+ CredentialsProvider credsProvider = new BasicCredentialsProvider();
+ credsProvider.setCredentials(
+ new AuthScope(target.getHostName(), target.getPort()),
+ new UsernamePasswordCredentials(username, password));
+
+ // http://hc.apache.org/httpcomponents-client-ga/tutorial/html/authentication.html
+ AuthCache authCache = new BasicAuthCache();
+ authCache.put(target, new BasicScheme());
+
+ // Add AuthCache to the execution context
+ context = HttpClientContext.create();
+ context.setCredentialsProvider(credsProvider);
+ context.setAuthCache(authCache);
+ } else {
+ context = null;
+ }
+ return context;
+ }
+
+
+ public static <RESPONSE_T, REQUEST_T> RESPONSE_T call( HttpClient taxiiClient
+ , URI endpoint
+ , REQUEST_T request
+ , HttpClientContext context
+ , Class<RESPONSE_T> responseClazz
+ ) throws JAXBException, IOException {
+ //TaxiiXml taxiiXml = xmlFactory.get().createTaxiiXml();
+ //String req = taxiiXml.marshalToString(request, true);
+ // Call the service
+ Object responseObj = taxiiClient.callTaxiiService(endpoint, request, context);
+ LOG.info("Request made : " + request.getClass().getCanonicalName() + " => " + responseObj.getClass().getCanonicalName() + " (expected " + responseClazz.getCanonicalName() + ")");
+ //String resp = taxiiXml.marshalToString(responseObj, true);
+ try {
+ return responseClazz.cast(responseObj);
+ }
+ catch(ClassCastException cce) {
+ TaxiiXml taxiiXml = xmlFactory.get().createTaxiiXml();
+ String resp = taxiiXml.marshalToString(responseObj, true);
+ String msg = "Didn't return the response we expected: " + responseObj.getClass() + " \n" + resp;
+ LOG.error(msg, cce);
+ throw new RuntimeException(msg, cce);
+ }
+ }
+ private static HttpClient buildClient(URL proxy, String username, String password) throws Exception
+ {
+ HttpClient client = new HttpClient(); // Start with a default TAXII HTTP client.
+
+ // Create an Apache HttpClientBuilder to be customized by the command line arguments.
+ HttpClientBuilder builder = HttpClientBuilder.create().useSystemProperties();
+
+ // Proxy
+ if (proxy != null) {
+ HttpHost proxyHost = new HttpHost(proxy.getHost(), proxy.getPort(), proxy.getProtocol());
+ builder.setProxy(proxyHost);
+ }
+
+ // Basic authentication. User & Password
+ if (username != null ^ password != null) {
+ throw new Exception("'username' and 'password' arguments are required to appear together.");
+ }
+
+
+ // from: http://stackoverflow.com/questions/19517538/ignoring-ssl-certificate-in-apache-httpclient-4-3
+ SSLContextBuilder ssbldr = new SSLContextBuilder();
+ ssbldr.loadTrustMaterial(null, new TrustSelfSignedStrategy());
+ SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(ssbldr.build(),SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
+
+
+ Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
+ .register("http", new PlainConnectionSocketFactory())
+ .register("https", sslsf)
+ .build();
+
+
+ PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registry);
+ cm.setMaxTotal(20);//max connection
+
+ System.setProperty("jsse.enableSNIExtension", "false"); //""
+ CloseableHttpClient httpClient = builder
+ .setSSLSocketFactory(sslsf)
+ .setConnectionManager(cm)
+ .build();
+
+ client.setHttpclient(httpClient);
+ return client;
+ }
+ public static void main(String... argv) throws Exception {
+ URL endpoint = new URL("http://hailataxii.com/taxii-discovery-service");
+ String username = "guest";
+ String password = "guest";
+ TaxiiConnectionConfig config = new TaxiiConnectionConfig();
+ config = config.withConnectionType(ConnectionType.DISCOVER)
+ .withEndpoint(endpoint)
+ .withUsername(username)
+ .withCollection("guest.Abuse_ch")
+ .withPassword(password);
+ //TaxiiHandler handler = new TaxiiHandler(config, null);
+ //handler.run();
+ //discoverPollingClient(null, endpoint, username, password, context);
+ }
+}