You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:05:53 UTC
[11/43] incubator-metron git commit: METRON-50: Ingest threat intel
data from Taxii feeds closes apache/incubator-metron#29
METRON-50: Ingest threat intel data from Taxii feeds closes apache/incubator-metron#29
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/0e1055aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/0e1055aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/0e1055aa
Branch: refs/heads/Metron_0.1BETA
Commit: 0e1055aafe78e428b0ad1fc085589a202cf14206
Parents: a712fe6
Author: cstella <ce...@gmail.com>
Authored: Thu Feb 25 11:24:56 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Thu Feb 25 11:24:56 2016 -0500
----------------------------------------------------------------------
NOTICE | 3 +
metron-streaming/Metron-Common/pom.xml | 6 +
.../hbase/converters/AbstractConverter.java | 93 +
.../metron/hbase/converters/HbaseConverter.java | 40 +
.../threatintel/ThreatIntelConverter.java | 39 +
.../converters/threatintel/ThreatIntelKey.java | 89 +
.../threatintel/ThreatIntelValue.java | 110 +
.../metron/reference/lookup/LookupKV.java | 65 +
.../metron/reference/lookup/LookupKey.java | 1 +
.../metron/reference/lookup/LookupValue.java | 28 +
.../metron/threatintel/ThreatIntelKey.java | 89 -
.../metron/threatintel/ThreatIntelResults.java | 2 +
.../metron/threatintel/hbase/Converter.java | 96 -
.../threatintel/hbase/ThreatIntelLookup.java | 24 +-
metron-streaming/Metron-DataLoads/pom.xml | 432 +--
.../src/main/bash/threatintel_bulk_load.sh | 2 +-
.../src/main/bash/threatintel_bulk_prune.sh | 2 +-
.../src/main/bash/threatintel_taxii_load.sh | 39 +
.../dataloads/LeastRecentlyUsedPruner.java | 221 --
.../metron/dataloads/ThreatIntelBulkLoader.java | 213 --
.../dataloads/bulk/LeastRecentlyUsedPruner.java | 221 ++
.../dataloads/bulk/ThreatIntelBulkLoader.java | 230 ++
.../metron/dataloads/extractor/Extractor.java | 4 +-
.../dataloads/extractor/csv/CSVExtractor.java | 20 +-
.../extractor/csv/LookupConverter.java | 30 +
.../extractor/csv/LookupConverters.java | 67 +
.../dataloads/extractor/stix/StixExtractor.java | 44 +-
.../extractor/stix/types/AddressHandler.java | 18 +-
.../extractor/stix/types/DomainHandler.java | 26 +-
.../extractor/stix/types/HostnameHandler.java | 17 +-
.../extractor/stix/types/ObjectTypeHandler.java | 3 +-
.../dataloads/hbase/mr/BulkLoadMapper.java | 21 +-
.../metron/dataloads/hbase/mr/PrunerMapper.java | 5 +
.../metron/dataloads/taxii/ConnectionType.java | 23 +
.../metron/dataloads/taxii/TableInfo.java | 72 +
.../dataloads/taxii/TaxiiConnectionConfig.java | 196 ++
.../metron/dataloads/taxii/TaxiiHandler.java | 403 +++
.../metron/dataloads/taxii/TaxiiLoader.java | 180 ++
.../dataloads/extractor/ExtractorTest.java | 24 +-
.../extractor/csv/CSVExtractorTest.java | 17 +-
.../extractor/stix/StixExtractorTest.java | 25 +-
.../dataloads/hbase/HBaseConverterTest.java | 67 -
.../hbase/HBaseThreatIntelConverterTest.java | 76 +
.../hbase/mr/BulkLoadMapperIntegrationTest.java | 24 +-
.../dataloads/hbase/mr/BulkLoadMapperTest.java | 19 +-
.../LeastRecentlyUsedPrunerIntegrationTest.java | 32 +-
.../dataloads/taxii/MockTaxiiService.java | 99 +
.../dataloads/taxii/TaxiiIntegrationTest.java | 121 +
.../resources/taxii-messages/message.discovery | 21 +
.../test/resources/taxii-messages/messages.poll | 2914 ++++++++++++++++++
.../metron/threatintel/ThreatIntelAdapter.java | 5 +-
metron-streaming/Metron-Indexing/pom.xml | 8 +-
metron-streaming/Metron-Testing/pom.xml | 88 +
.../metron/integration/util/UnitTestHelper.java | 84 +
.../util/integration/ComponentRunner.java | 130 +
.../util/integration/InMemoryComponent.java | 23 +
.../integration/util/integration/Processor.java | 23 +
.../util/integration/ReadinessState.java | 22 +
.../integration/UnableToStartException.java | 27 +
.../components/ElasticSearchComponent.java | 188 ++
.../components/FluxTopologyComponent.java | 132 +
.../integration/util/mock/MockHTable.java | 672 ++++
metron-streaming/Metron-Topologies/pom.xml | 6 +
.../integration/pcap/PcapIntegrationTest.java | 29 +-
.../metron/integration/util/UnitTestHelper.java | 84 -
.../util/integration/ComponentRunner.java | 127 -
.../util/integration/InMemoryComponent.java | 23 -
.../integration/util/integration/Processor.java | 23 -
.../util/integration/ReadinessState.java | 22 -
.../integration/UnableToStartException.java | 27 -
.../components/ElasticSearchComponent.java | 188 --
.../components/FluxTopologyComponent.java | 132 -
.../integration/util/mock/MockHTable.java | 673 ----
.../util/threatintel/ThreatIntelHelper.java | 12 +-
metron-streaming/pom.xml | 1 +
pom.xml | 2 +
76 files changed, 7034 insertions(+), 2330 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 9505518..ec99233 100644
--- a/NOTICE
+++ b/NOTICE
@@ -3,3 +3,6 @@
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
+
+ This product includes Jersey Client: https://jersey.java.net/
+ License: Common Development and Distribution License (CDDL) v1.0 (https://glassfish.dev.java.net/public/CDDLv1.0.html)
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
index c9918a2..57a58d7 100644
--- a/metron-streaming/Metron-Common/pom.xml
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -132,6 +132,12 @@
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${global_hbase_version}</version>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java
new file mode 100644
index 0000000..9072c22
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/AbstractConverter.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.hbase.converters;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupValue;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.*;
+
+
+public abstract class AbstractConverter<KEY_T extends LookupKey, VALUE_T extends LookupValue> implements HbaseConverter<KEY_T,VALUE_T> {
+ public static Function<Cell, Map.Entry<byte[], byte[]>> CELL_TO_ENTRY = new Function<Cell, Map.Entry<byte[], byte[]>>() {
+
+ @Nullable
+ @Override
+ public Map.Entry<byte[], byte[]> apply(@Nullable Cell cell) {
+ return new AbstractMap.SimpleEntry<byte[], byte[]>(cell.getQualifier(), cell.getValue());
+ }
+ };
+ @Override
+ public Put toPut(String columnFamily, KEY_T key, VALUE_T values) throws IOException {
+ Put put = new Put(key.toBytes());
+ byte[] cf = Bytes.toBytes(columnFamily);
+ for(Map.Entry<byte[], byte[]> kv : values.toColumns()) {
+ put.add(cf, kv.getKey(), kv.getValue());
+ }
+ return put;
+ }
+
+ public LookupKV<KEY_T, VALUE_T> fromPut(Put put, String columnFamily, KEY_T key, VALUE_T value) throws IOException {
+ key.fromBytes(put.getRow());
+ byte[] cf = Bytes.toBytes(columnFamily);
+ value.fromColumns(Iterables.transform(put.getFamilyCellMap().get(cf), CELL_TO_ENTRY));
+ return new LookupKV<>(key, value);
+ }
+
+ @Override
+ public Result toResult(String columnFamily, KEY_T key, VALUE_T values) throws IOException {
+ Put put = toPut(columnFamily, key, values);
+ return Result.create(put.getFamilyCellMap().get(Bytes.toBytes(columnFamily)));
+ }
+
+ public LookupKV<KEY_T, VALUE_T> fromResult(Result result, String columnFamily, KEY_T key, VALUE_T value) throws IOException {
+ key.fromBytes(result.getRow());
+ byte[] cf = Bytes.toBytes(columnFamily);
+ NavigableMap<byte[], byte[]> cols = result.getFamilyMap(cf);
+ value.fromColumns(cols.entrySet());
+ return new LookupKV<>(key, value);
+ }
+ @Override
+ public Get toGet(String columnFamily, KEY_T key) {
+ Get ret = new Get(key.toBytes());
+ ret.addFamily(Bytes.toBytes(columnFamily));
+ return ret;
+ }
+
+ public static Iterable<Map.Entry<byte[], byte[]>> toEntries(byte[]... kvs) {
+ if(kvs.length % 2 != 0) {
+ throw new IllegalStateException("Must be an even size");
+ }
+ List<Map.Entry<byte[], byte[]>> ret = new ArrayList<>(kvs.length/2);
+ for(int i = 0;i < kvs.length;i += 2) {
+ ret.add(new AbstractMap.SimpleImmutableEntry<>(kvs[i], kvs[i+1])) ;
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java
new file mode 100644
index 0000000..449d9cf
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/HbaseConverter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.hbase.converters;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupValue;
+
+import java.io.IOException;
+
+public interface HbaseConverter<KEY_T extends LookupKey, VALUE_T extends LookupValue> {
+ Put toPut(String columnFamily, KEY_T key, VALUE_T values) throws IOException;
+
+ LookupKV<KEY_T, VALUE_T> fromPut(Put put, String columnFamily) throws IOException;
+
+ Result toResult(String columnFamily, KEY_T key, VALUE_T values) throws IOException;
+
+ LookupKV<KEY_T, VALUE_T> fromResult(Result result, String columnFamily) throws IOException;
+
+ Get toGet(String columnFamily, KEY_T key);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelConverter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelConverter.java
new file mode 100644
index 0000000..d534a52
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelConverter.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.hbase.converters.threatintel;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.hbase.converters.AbstractConverter;
+import org.apache.metron.reference.lookup.LookupKV;
+
+import java.io.IOException;
+
+public class ThreatIntelConverter extends AbstractConverter<ThreatIntelKey, ThreatIntelValue> {
+
+ @Override
+ public LookupKV<ThreatIntelKey, ThreatIntelValue> fromPut(Put put, String columnFamily) throws IOException {
+ return fromPut(put, columnFamily, new ThreatIntelKey(), new ThreatIntelValue());
+ }
+
+ @Override
+ public LookupKV<ThreatIntelKey, ThreatIntelValue> fromResult(Result result, String columnFamily) throws IOException {
+ return fromResult(result, columnFamily, new ThreatIntelKey(), new ThreatIntelValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelKey.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelKey.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelKey.java
new file mode 100644
index 0000000..3d898d9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelKey.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase.converters.threatintel;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.reference.lookup.LookupKey;
+
+public class ThreatIntelKey implements LookupKey{
+ private static final int SEED = 0xDEADBEEF;
+ private static final int HASH_PREFIX_SIZE=16;
+ ThreadLocal<HashFunction> hFunction= new ThreadLocal<HashFunction>() {
+ @Override
+ protected HashFunction initialValue() {
+ return Hashing.murmur3_128(SEED);
+ }
+ };
+ public ThreatIntelKey() {
+
+ }
+ public ThreatIntelKey(String indicator) {
+ this.indicator = indicator;
+ }
+
+ public String indicator;
+
+ @Override
+ public byte[] toBytes() {
+ byte[] indicatorBytes = Bytes.toBytes(indicator);
+ Hasher hasher = hFunction.get().newHasher();
+ hasher.putBytes(Bytes.toBytes(indicator));
+ byte[] prefix = hasher.hash().asBytes();
+ byte[] val = new byte[indicatorBytes.length + prefix.length];
+ int pos = 0;
+ for(int i = 0;pos < prefix.length;++pos,++i) {
+ val[pos] = prefix[i];
+ }
+ for(int i = 0;i < indicatorBytes.length;++pos,++i) {
+ val[pos] = indicatorBytes[i];
+ }
+ return val;
+ }
+
+ @Override
+ public void fromBytes(byte[] row) {
+ ThreatIntelKey key = this;
+ key.indicator = Bytes.toString(row, HASH_PREFIX_SIZE, row.length - HASH_PREFIX_SIZE);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ThreatIntelKey that = (ThreatIntelKey) o;
+
+ return indicator != null ? indicator.equals(that.indicator) : that.indicator == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return indicator != null ? indicator.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "ThreatIntelKey{" +
+ "indicator='" + indicator + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelValue.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelValue.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelValue.java
new file mode 100644
index 0000000..97f0762
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/converters/threatintel/ThreatIntelValue.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.hbase.converters.threatintel;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.converters.AbstractConverter;
+import org.apache.metron.reference.lookup.LookupValue;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class ThreatIntelValue implements LookupValue {
+ private static final ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
+ @Override
+ protected ObjectMapper initialValue() {
+ return new ObjectMapper();
+ }
+ };
+ public static final String VALUE_COLUMN_NAME = "v";
+ public static final byte[] VALUE_COLUMN_NAME_B = Bytes.toBytes(VALUE_COLUMN_NAME);
+ public static final String LAST_SEEN_COLUMN_NAME = "t";
+ public static final byte[] LAST_SEEN_COLUMN_NAME_B = Bytes.toBytes(LAST_SEEN_COLUMN_NAME);
+
+ private Map<String, String> metadata = null;
+
+ public ThreatIntelValue()
+ {
+
+ }
+
+ public ThreatIntelValue(Map<String, String> metadata) {
+ this.metadata = metadata;
+ }
+
+
+
+ public Map<String, String> getMetadata() {
+ return metadata;
+ }
+
+ @Override
+ public Iterable<Map.Entry<byte[], byte[]>> toColumns() {
+ return AbstractConverter.toEntries( VALUE_COLUMN_NAME_B, Bytes.toBytes(valueToString(metadata))
+ );
+ }
+
+ @Override
+ public void fromColumns(Iterable<Map.Entry<byte[], byte[]>> values) {
+ for(Map.Entry<byte[], byte[]> cell : values) {
+ if(Bytes.equals(cell.getKey(), VALUE_COLUMN_NAME_B)) {
+ metadata = stringToValue(Bytes.toString(cell.getValue()));
+ }
+ }
+ }
+ public Map<String, String> stringToValue(String s){
+ try {
+ return _mapper.get().readValue(s, new TypeReference<Map<String, String>>(){});
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to convert string to metadata: " + s);
+ }
+ }
+ public String valueToString(Map<String, String> value) {
+ try {
+ return _mapper.get().writeValueAsString(value);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to convert metadata to string: " + value);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ThreatIntelValue that = (ThreatIntelValue) o;
+
+ return getMetadata() != null ? getMetadata().equals(that.getMetadata()) : that.getMetadata() == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return getMetadata() != null ? getMetadata().hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "ThreatIntelValue{" +
+ "metadata=" + metadata +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKV.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKV.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKV.java
new file mode 100644
index 0000000..eb2b552
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKV.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.reference.lookup;
+
+import java.io.Serializable;
+
+public class LookupKV<KEY_T extends LookupKey, VALUE_T extends LookupValue> implements Serializable {
+ private KEY_T key;
+ private VALUE_T value;
+ public LookupKV(KEY_T key, VALUE_T value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public KEY_T getKey() {
+ return key;
+ }
+
+ public VALUE_T getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ LookupKV<?, ?> lookupKV = (LookupKV<?, ?>) o;
+
+ if (key != null ? !key.equals(lookupKV.key) : lookupKV.key != null) return false;
+ return value != null ? value.equals(lookupKV.value) : lookupKV.value == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = key != null ? key.hashCode() : 0;
+ result = 31 * result + (value != null ? value.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "LookupKV{" +
+ "key=" + key +
+ ", value=" + value +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
index 8aecd64..c51ed9f 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupKey.java
@@ -19,4 +19,5 @@ package org.apache.metron.reference.lookup;
public interface LookupKey {
byte[] toBytes();
+ void fromBytes(byte[] in);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupValue.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupValue.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupValue.java
new file mode 100644
index 0000000..448f4c9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/reference/lookup/LookupValue.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.metron.reference.lookup;
+
+import java.util.Map;
+import java.util.NavigableMap;
+
+public interface LookupValue {
+ Iterable<Map.Entry<byte[], byte[]>> toColumns();
+ void fromColumns(Iterable<Map.Entry<byte[], byte[]>> values);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java
deleted file mode 100644
index 8e90bc7..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelKey.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.threatintel;
-
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.reference.lookup.LookupKey;
-
-public class ThreatIntelKey implements LookupKey{
- private static final int SEED = 0xDEADBEEF;
- private static final int HASH_PREFIX_SIZE=16;
- ThreadLocal<HashFunction> hFunction= new ThreadLocal<HashFunction>() {
- @Override
- protected HashFunction initialValue() {
- return Hashing.murmur3_128(SEED);
- }
- };
- public ThreatIntelKey() {
-
- }
- public ThreatIntelKey(String indicator) {
- this.indicator = indicator;
- }
-
- public String indicator;
-
- @Override
- public byte[] toBytes() {
- byte[] indicatorBytes = Bytes.toBytes(indicator);
- Hasher hasher = hFunction.get().newHasher();
- hasher.putBytes(Bytes.toBytes(indicator));
- byte[] prefix = hasher.hash().asBytes();
- byte[] val = new byte[indicatorBytes.length + prefix.length];
- int pos = 0;
- for(int i = 0;pos < prefix.length;++pos,++i) {
- val[pos] = prefix[i];
- }
- for(int i = 0;i < indicatorBytes.length;++pos,++i) {
- val[pos] = indicatorBytes[i];
- }
- return val;
- }
-
- public static ThreatIntelKey fromBytes(byte[] row) {
- ThreatIntelKey key = new ThreatIntelKey();
- key.indicator = Bytes.toString(row, HASH_PREFIX_SIZE, row.length - HASH_PREFIX_SIZE);
- return key;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- ThreatIntelKey that = (ThreatIntelKey) o;
-
- return indicator != null ? indicator.equals(that.indicator) : that.indicator == null;
-
- }
-
- @Override
- public int hashCode() {
- return indicator != null ? indicator.hashCode() : 0;
- }
-
- @Override
- public String toString() {
- return "ThreatIntelKey{" +
- "indicator='" + indicator + '\'' +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
index 08f1cdc..8186b38 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/ThreatIntelResults.java
@@ -17,6 +17,8 @@
*/
package org.apache.metron.threatintel;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+
import java.util.HashMap;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java
deleted file mode 100644
index 89a846a..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/Converter.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.threatintel.hbase;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.threatintel.ThreatIntelKey;
-import org.apache.metron.threatintel.ThreatIntelResults;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-
-import java.io.IOException;
-import java.util.*;
-
-public enum Converter {
- INSTANCE;
- public static final String VALUE_COLUMN_NAME = "v";
- public static final byte[] VALUE_COLUMN_NAME_B = Bytes.toBytes(VALUE_COLUMN_NAME);
- public static final String LAST_SEEN_COLUMN_NAME = "t";
- public static final byte[] LAST_SEEN_COLUMN_NAME_B = Bytes.toBytes(LAST_SEEN_COLUMN_NAME);
- private static final ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
- @Override
- protected ObjectMapper initialValue() {
- return new ObjectMapper();
- }
- };
- public Put toPut(String columnFamily, ThreatIntelKey key, Map<String, String> value, Long lastSeenTimestamp) throws IOException {
- Put put = new Put(key.toBytes());
- byte[] cf = Bytes.toBytes(columnFamily);
- put.add(cf,VALUE_COLUMN_NAME_B, Bytes.toBytes(valueToString(value)));
- put.add(cf, LAST_SEEN_COLUMN_NAME_B, Bytes.toBytes(lastSeenTimestamp));
- return put;
- }
-
- public Map.Entry<ThreatIntelResults, Long> fromPut(Put put, String columnFamily) throws IOException {
- ThreatIntelKey key = ThreatIntelKey.fromBytes(put.getRow());
- Map<String, String> value = null;
- Long lastSeen = null;
- byte[] cf = Bytes.toBytes(columnFamily);
- List<Cell> cells = put.getFamilyCellMap().get(cf);
- for(Cell cell : cells) {
- if(Bytes.equals(cell.getQualifier(), VALUE_COLUMN_NAME_B)) {
- value = stringToValue(Bytes.toString(cell.getValue()));
- }
- else if(Bytes.equals(cell.getQualifier(), LAST_SEEN_COLUMN_NAME_B)) {
- lastSeen = Bytes.toLong(cell.getValue());
- }
- }
- return new AbstractMap.SimpleEntry<>(new ThreatIntelResults(key, value), lastSeen);
- }
-
- public Result toResult(String columnFamily, ThreatIntelKey key, Map<String, String> value, Long lastSeenTimestamp) throws IOException {
- Put put = toPut(columnFamily, key, value, lastSeenTimestamp);
- return Result.create(put.getFamilyCellMap().get(Bytes.toBytes(columnFamily)));
- }
-
- public Map.Entry<ThreatIntelResults, Long> fromResult(Result result, String columnFamily) throws IOException {
- ThreatIntelKey key = ThreatIntelKey.fromBytes(result.getRow());
- byte[] cf = Bytes.toBytes(columnFamily);
- NavigableMap<byte[], byte[]> cols = result.getFamilyMap(cf);
- Map<String, String> value = stringToValue(Bytes.toString(cols.get(VALUE_COLUMN_NAME_B)));
- ThreatIntelResults results = new ThreatIntelResults(key, value);
- return new AbstractMap.SimpleEntry<>(results, Bytes.toLong(cols.get(LAST_SEEN_COLUMN_NAME_B)));
- }
-
- public Get toGet(String columnFamily, ThreatIntelKey key) {
- Get ret = new Get(key.toBytes());
- ret.addFamily(Bytes.toBytes(columnFamily));
- return ret;
- }
-
- public Map<String, String> stringToValue(String s) throws IOException {
- return _mapper.get().readValue(s, new TypeReference<Map<String, String>>(){});
- }
- public String valueToString(Map<String, String> value) throws IOException {
- return _mapper.get().writeValueAsString(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
index c7997fe..13efc42 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/threatintel/hbase/ThreatIntelLookup.java
@@ -17,36 +17,36 @@
*/
package org.apache.metron.threatintel.hbase;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.hbase.converters.HbaseConverter;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
import org.apache.metron.reference.lookup.Lookup;
-import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.LookupKV;
import org.apache.metron.reference.lookup.accesstracker.AccessTracker;
-import org.apache.metron.reference.lookup.handler.Handler;
-import org.apache.metron.threatintel.ThreatIntelKey;
-import org.apache.metron.threatintel.ThreatIntelResults;
-import java.io.Closeable;
import java.io.IOException;
-import java.util.Map;
-public class ThreatIntelLookup extends Lookup<HTableInterface, ThreatIntelKey, Map.Entry<ThreatIntelResults, Long>> implements AutoCloseable {
+public class ThreatIntelLookup extends Lookup<HTableInterface, ThreatIntelKey, LookupKV<ThreatIntelKey,ThreatIntelValue>> implements AutoCloseable {
- public static class Handler implements org.apache.metron.reference.lookup.handler.Handler<HTableInterface, ThreatIntelKey, Map.Entry<ThreatIntelResults, Long>> {
+
+ public static class Handler implements org.apache.metron.reference.lookup.handler.Handler<HTableInterface,ThreatIntelKey,LookupKV<ThreatIntelKey,ThreatIntelValue>> {
String columnFamily;
+ HbaseConverter<ThreatIntelKey, ThreatIntelValue> converter = new ThreatIntelConverter();
public Handler(String columnFamily) {
this.columnFamily = columnFamily;
}
@Override
public boolean exists(ThreatIntelKey key, HTableInterface table, boolean logAccess) throws IOException {
- return table.exists(Converter.INSTANCE.toGet(columnFamily, key));
+ return table.exists(converter.toGet(columnFamily, key));
}
@Override
- public Map.Entry<ThreatIntelResults,Long> get(ThreatIntelKey key, HTableInterface table, boolean logAccess) throws IOException {
- return Converter.INSTANCE.fromResult(table.get(Converter.INSTANCE.toGet(columnFamily, key)), columnFamily);
+ public LookupKV<ThreatIntelKey, ThreatIntelValue> get(ThreatIntelKey key, HTableInterface table, boolean logAccess) throws IOException {
+ return converter.fromResult(table.get(converter.toGet(columnFamily, key)), columnFamily);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/pom.xml b/metron-streaming/Metron-DataLoads/pom.xml
index d3eef64..5cac558 100644
--- a/metron-streaming/Metron-DataLoads/pom.xml
+++ b/metron-streaming/Metron-DataLoads/pom.xml
@@ -13,232 +13,270 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.metron</groupId>
- <artifactId>Metron-Streaming</artifactId>
- <version>0.1BETA</version>
- </parent>
- <artifactId>Metron-DataLoads</artifactId>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- </properties>
- <dependencies>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${global_hbase_guava_version}</version>
- </dependency>
- <dependency>
- <groupId>org.mitre</groupId>
- <artifactId>stix</artifactId>
- <version>1.2.0.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.metron</groupId>
- <artifactId>Metron-Common</artifactId>
- <version>${project.parent.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.googlecode.disruptor</groupId>
- <artifactId>disruptor</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!--dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${global_storm_version}</version>
- <scope>provided</scope>
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Streaming</artifactId>
+ <version>0.1BETA</version>
+ </parent>
+ <artifactId>Metron-DataLoads</artifactId>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <httpcore.version>4.3.2</httpcore.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_hbase_guava_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <version>2.2.11</version>
+ </dependency>
+ <dependency>
+ <groupId>net.sf.saxon</groupId>
+ <artifactId>Saxon-HE</artifactId>
+ <version>9.5.1-5</version>
<exclusions>
<exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
</exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.xml.bind</groupId>
+ <artifactId>jaxb-impl</artifactId>
+ <version>2.2.5-2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mitre</groupId>
+ <artifactId>stix</artifactId>
+ <version>1.2.0.2</version>
+ <exclusions>
<exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
</exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Common</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
<exclusion>
<groupId>com.googlecode.disruptor</groupId>
<artifactId>disruptor</artifactId>
</exclusion>
</exclusions>
- </dependency-->
+ </dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${global_hbase_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>log4j</artifactId>
- <groupId>log4j</groupId>
- </exclusion>
- </exclusions>
+ <groupId>org.mitre.taxii</groupId>
+ <artifactId>taxii</artifactId>
+ <version>1.1.0.1</version>
+ <!--scope>provided</scope-->
</dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${global_hbase_version}</version>
- <scope>provided</scope>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
- <groupId>com.opencsv</groupId>
- <artifactId>opencsv</artifactId>
- <version>${global_opencsv_version}</version>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
-
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-testing-util</artifactId>
- <version>${global_hbase_version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!--dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${global_guava_version}</version>
- </dependency-->
-
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- <version>${global_slf4j_version}</version>
- <scope>test</scope>
+ <groupId>com.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ <version>${global_opencsv_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>${httpcore.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpcore.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ <version>1.19</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${global_slf4j_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Testing</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
</dependency>
-
- </dependencies>
- <build>
+ </dependencies>
+ <build>
<resources>
- <resource>
- <directory>src</directory>
- <excludes>
- <exclude>**/*.java</exclude>
- </excludes>
- </resource>
+ <resource>
+ <directory>src</directory>
+ <excludes>
+ <exclude>**/*.java</exclude>
+ </excludes>
+ </resource>
</resources>
<plugins>
- <plugin>
- <!-- Separates the unit tests from the integration tests. -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.12.4</version>
- <configuration>
- <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
- <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
- <skip>true</skip>
- <!-- Show 100% of the lines from the stack trace (doesn't work) -->
- <trimStackTrace>false</trimStackTrace>
-
- </configuration>
- <executions>
- <execution>
- <id>unit-tests</id>
- <phase>test</phase>
- <goals>
- <goal>test</goal>
- </goals>
+ <plugin>
+ <!-- Separates the unit tests from the integration tests. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.12.4</version>
<configuration>
- <!-- Never skip running the tests when the test phase is invoked -->
- <skip>false</skip>
- <includes>
- <!-- Include unit tests within integration-test phase. -->
- <include>**/*Test.java</include>
- </includes>
- <excludes>
- <!-- Exclude integration tests within (unit) test phase. -->
- <exclude>**/*IntegrationTest.java</exclude>
- </excludes>
+ <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
+ <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
+ <skip>true</skip>
+ <!-- Show 100% of the lines from the stack trace (doesn't work) -->
+ <trimStackTrace>false</trimStackTrace>
+
</configuration>
- </execution>
- <execution>
- <id>integration-tests</id>
- <phase>integration-test</phase>
- <goals>
- <goal>test</goal>
- </goals>
+ <executions>
+ <execution>
+ <id>unit-tests</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <!-- Never skip running the tests when the test phase is invoked -->
+ <skip>false</skip>
+ <includes>
+ <!-- Include unit tests within integration-test phase. -->
+ <include>**/*Test.java</include>
+ </includes>
+ <excludes>
+ <!-- Exclude integration tests within (unit) test phase. -->
+ <exclude>**/*IntegrationTest.java</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ <execution>
+ <id>integration-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <!-- Never skip running the tests when the integration-test phase is invoked -->
+ <skip>false</skip>
+ <includes>
+ <!-- Include integration tests within integration-test phase. -->
+ <include>**/*IntegrationTest.java</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
<configuration>
- <!-- Never skip running the tests when the integration-test phase is invoked -->
- <skip>false</skip>
- <includes>
- <!-- Include integration tests within integration-test phase. -->
- <include>**/*IntegrationTest.java</include>
- </includes>
+ <source>1.7</source>
+ <target>1.7</target>
</configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.3</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.metron.guava.dataload</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>org.apache.metron.httpcore.dataload</shadedPattern>
+ </relocation>
+ </relocations>
+ <!--minimizeJar>true</minimizeJar-->
+ <artifactSet>
+ <excludes>
+ <exclude>classworlds:classworlds</exclude>
+ <exclude>junit:junit</exclude>
+ <exclude>jmock:*</exclude>
+ <exclude>*:xml-apis</exclude>
+ <exclude>*slf4j*</exclude>
+ <exclude>org.apache.maven:lib:tests</exclude>
+ <exclude>log4j:log4j:jar:</exclude>
+ <exclude>*:hbase:*</exclude>
+ <exclude>org.apache.hadoop.yarn.util.package-info*</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
<configuration>
- <relocations>
- <relocation>
- <pattern>com.google.common</pattern>
- <shadedPattern>org.apache.metron.guava.dataload</shadedPattern>
- </relocation>
- </relocations>
- <minimizeJar>true</minimizeJar>
- <artifactSet>
- <excludes>
- <exclude>classworlds:classworlds</exclude>
- <exclude>junit:junit</exclude>
- <exclude>jmock:*</exclude>
- <exclude>*:xml-apis</exclude>
- <exclude>*slf4j*</exclude>
- <exclude>org.apache.maven:lib:tests</exclude>
- <exclude>log4j:log4j:jar:</exclude>
- <exclude>*:hbase:*</exclude>
- <exclude>org.apache.hadoop.yarn.util.package-info*</exclude>
- </excludes>
- </artifactSet>
+ <descriptor>src/main/assembly/assembly.xml</descriptor>
</configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptor>src/main/assembly/assembly.xml</descriptor>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id> <!-- this is used for inheritance merges -->
- <phase>package</phase> <!-- bind to the packaging phase -->
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
- </build>
+ </build>
- </project>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
index 0b7a83c..32b07f3 100755
--- a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_load.sh
@@ -35,4 +35,4 @@ for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
fi
done
export HADOOP_CLASSPATH
-hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.ThreatIntelBulkLoader -libjars ${LIBJARS} "$@"
+hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.bulk.ThreatIntelBulkLoader -libjars ${LIBJARS} "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
index ee230c2..88cd946 100755
--- a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_bulk_prune.sh
@@ -34,4 +34,4 @@ for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
fi
done
export HADOOP_CLASSPATH
-hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.LeastRecentlyUsedPruner -libjars ${LIBJARS} "$@"
+hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.bulk.LeastRecentlyUsedPruner -libjars ${LIBJARS} "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh
new file mode 100755
index 0000000..77011fb
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/threatintel_taxii_load.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+ . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+ . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+CP=/usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar:/usr/metron/0.1BETA/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath`
+HADOOP_CLASSPATH=$(echo $CP )
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+ if [ -f $jar ];then
+ LIBJARS="$jar,$LIBJARS"
+ fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.1BETA/lib/Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.taxii.TaxiiLoader "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java
deleted file mode 100644
index 4ee6e2f..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/LeastRecentlyUsedPruner.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import org.apache.commons.cli.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.metron.dataloads.hbase.mr.PrunerMapper;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-public class LeastRecentlyUsedPruner {
- private static abstract class OptionHandler implements Function<String, Option> {}
- private enum BulkLoadOptions {
- HELP("h", new OptionHandler() {
-
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- return new Option(s, "help", false, "Generate Help screen");
- }
- }), TABLE("t", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "table", true, "HBase table to prune");
- o.setRequired(true);
- o.setArgName("HBASE_TABLE");
- return o;
- }
- }), COLUMN_FAMILY("f", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "column_family", true, "Column family of the HBase table to prune");
- o.setRequired(false);
- o.setArgName("CF_NAME");
- return o;
- }
- })
- ,AS_OF_TIME("a", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "as_of", true, "The earliest access tracker you want to use.");
- o.setArgName("datetime");
- o.setRequired(true);
- return o;
- }
- })
- ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- String defaultFormat = new SimpleDateFormat().toLocalizedPattern();
- Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option) (Default is: " + defaultFormat + ")");
- o.setArgName("format");
- o.setRequired(false);
- return o;
- }
- })
- ,ACCESS_TABLE("u", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "access_table", true, "HBase table containing the access trackers.");
- o.setRequired(true);
- o.setArgName("HBASE_TABLE");
- return o;
- }
- }), ACCESS_COLUMN_FAMILY("z", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "access_column_family", true, "Column family of the HBase table containing the access trackers");
- o.setRequired(true);
- o.setArgName("CF_NAME");
- return o;
- }
- });
- Option option;
- String shortCode;
- BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
- this.shortCode = shortCode;
- this.option = optionHandler.apply(shortCode);
- }
-
- public boolean has(CommandLine cli) {
- return cli.hasOption(shortCode);
- }
-
- public String get(CommandLine cli) {
- return cli.getOptionValue(shortCode);
- }
- private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
- Date d = getFormat(cli).parse(BulkLoadOptions.AS_OF_TIME.get(cli));
- return d.getTime();
- }
-
- private static DateFormat getFormat(CommandLine cli) {
- DateFormat format = new SimpleDateFormat();
- if (BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
- format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
- }
- return format;
- }
-
- public static CommandLine parse(CommandLineParser parser, String[] args) {
- try {
- CommandLine cli = parser.parse(getOptions(), args);
- if(BulkLoadOptions.HELP.has(cli)) {
- printHelp();
- System.exit(0);
- }
- return cli;
- } catch (ParseException e) {
- System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
- e.printStackTrace(System.err);
- printHelp();
- System.exit(-1);
- return null;
- }
- }
-
- public static void printHelp() {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
- }
-
- public static Options getOptions() {
- Options ret = new Options();
- for(BulkLoadOptions o : BulkLoadOptions.values()) {
- ret.addOption(o.option);
- }
- return ret;
- }
- }
-
- public static void setupHBaseJob(Job job, String sourceTable, String cf) throws IOException {
- Scan scan = new Scan();
- if(cf != null) {
- scan.addFamily(Bytes.toBytes(cf));
- }
- scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
- scan.setCacheBlocks(false); // don't set to true for MR jobs
-// set other scan attrs
-
- TableMapReduceUtil.initTableMapperJob(
- sourceTable, // input table
- scan, // Scan instance to control CF and attribute selection
- PrunerMapper.class, // mapper class
- null, // mapper output key
- null, // mapper output value
- job);
- TableMapReduceUtil.initTableReducerJob(
- sourceTable, // output table
- null, // reducer class
- job);
- }
-
- public static Job createJob( Configuration conf
- , String table
- , String cf
- , String accessTrackerTable
- , String accessTrackerColumnFamily
- , Long ts
- ) throws IOException
- {
- Job job = new Job(conf);
- job.setJobName("LeastRecentlyUsedPruner: Pruning " + table + ":" + cf + " since " + new SimpleDateFormat().format(new Date(ts)));
- System.out.println("Configuring " + job.getJobName());
- job.setJarByClass(LeastRecentlyUsedPruner.class);
- job.getConfiguration().setLong(PrunerMapper.TIMESTAMP_CONF, ts);
- job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_NAME_CONF, table);
- job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_CF_CONF, accessTrackerColumnFamily);
- job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_TABLE_CONF, accessTrackerTable);
- setupHBaseJob(job, table, cf);
- job.setNumReduceTasks(0);
- return job;
- }
-
- public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
- Configuration conf = HBaseConfiguration.create();
- String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
-
- CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
- Long ts = BulkLoadOptions.getTimestamp(cli);
- String table = BulkLoadOptions.TABLE.get(cli);
- String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
- String accessTrackerTable = BulkLoadOptions.ACCESS_TABLE.get(cli);
- String accessTrackerCF = BulkLoadOptions.ACCESS_COLUMN_FAMILY.get(cli);
- Job job = createJob(conf, table, cf, accessTrackerTable, accessTrackerCF, ts);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java
deleted file mode 100644
index 246924c..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/ThreatIntelBulkLoader.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.io.Files;
-import org.apache.commons.cli.*;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.metron.dataloads.extractor.ExtractorHandler;
-import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.text.*;
-import java.util.Date;
-
-public class ThreatIntelBulkLoader {
- private static abstract class OptionHandler implements Function<String, Option> {}
- private enum BulkLoadOptions {
- HELP("h", new OptionHandler() {
-
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- return new Option(s, "help", false, "Generate Help screen");
- }
- })
- ,TABLE("t", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "table", true, "HBase table to import data into");
- o.setRequired(true);
- o.setArgName("HBASE_TABLE");
- return o;
- }
- })
- ,COLUMN_FAMILY("f", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "column_family", true, "Column family of the HBase table to import into");
- o.setRequired(true);
- o.setArgName("CF_NAME");
- return o;
- }
- })
- ,EXTRACTOR_CONFIG("e", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
- o.setArgName("JSON_FILE");
- o.setRequired(true);
- return o;
- }
- })
- ,INPUT_DATA("i", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "input", true, "Input directory in HDFS for the data to import into HBase");
- o.setArgName("DIR");
- o.setRequired(true);
- return o;
- }
- })
- ,AS_OF_TIME("a", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "as_of", true, "The last read timestamp to mark the records with (omit for time of execution)");
- o.setArgName("datetime");
- o.setRequired(false);
- return o;
- }
- })
- ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option)");
- o.setArgName("format");
- o.setRequired(false);
- return o;
- }
- })
- ;
- Option option;
- String shortCode;
- BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
- this.shortCode = shortCode;
- this.option = optionHandler.apply(shortCode);
- }
-
- public boolean has(CommandLine cli) {
- return cli.hasOption(shortCode);
- }
-
- public String get(CommandLine cli) {
- return cli.getOptionValue(shortCode);
- }
-
- public static CommandLine parse(CommandLineParser parser, String[] args) {
- try {
- CommandLine cli = parser.parse(getOptions(), args);
- if(ThreatIntelBulkLoader.BulkLoadOptions.HELP.has(cli)) {
- printHelp();
- System.exit(0);
- }
- return cli;
- } catch (ParseException e) {
- System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
- e.printStackTrace(System.err);
- printHelp();
- System.exit(-1);
- return null;
- }
- }
-
- public static void printHelp() {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
- }
-
- public static Options getOptions() {
- Options ret = new Options();
- for(BulkLoadOptions o : BulkLoadOptions.values()) {
- ret.addOption(o.option);
- }
- return ret;
- }
- }
- private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
- if(BulkLoadOptions.AS_OF_TIME.has(cli)) {
- if(!BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
- throw new IllegalStateException("Unable to proceed: Specified as_of_time without an associated format.");
- }
- else {
- DateFormat format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
- Date d = format.parse(BulkLoadOptions.AS_OF_TIME.get(cli));
- return d.getTime();
- }
- }
- else {
- return System.currentTimeMillis();
- }
- }
- private static String readExtractorConfig(File configFile) throws IOException {
- return Joiner.on("\n").join(Files.readLines(configFile, Charset.defaultCharset()));
- }
-
- public static Job createJob(Configuration conf, String input, String table, String cf, String extractorConfigContents, long ts) throws IOException {
- Job job = new Job(conf);
- job.setJobName("ThreatIntelBulkLoader: " + input + " => " + table + ":" + cf);
- System.out.println("Configuring " + job.getJobName());
- job.setJarByClass(ThreatIntelBulkLoader.class);
- job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
- job.setOutputFormatClass(TableOutputFormat.class);
- job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
- job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
- job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
- job.getConfiguration().set(BulkLoadMapper.LAST_SEEN_KEY, "" + ts);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Put.class);
- job.setNumReduceTasks(0);
- ExtractorHandler handler = ExtractorHandler.load(extractorConfigContents);
- handler.getInputFormatHandler().set(job, new Path(input), handler.getConfig());
- return job;
- }
-
- public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
- Configuration conf = HBaseConfiguration.create();
- String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
-
- CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
- Long ts = getTimestamp(cli);
- String input = BulkLoadOptions.INPUT_DATA.get(cli);
- String table = BulkLoadOptions.TABLE.get(cli);
- String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
- String extractorConfigContents = readExtractorConfig(new File(BulkLoadOptions.EXTRACTOR_CONFIG.get(cli)));
- Job job = createJob(conf, input, table, cf, extractorConfigContents, ts);
- System.out.println(conf);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
-}