You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/04/07 14:56:35 UTC
[2/6] incubator-metron git commit: METRON-93: Generalize the HBase
threat intel infrastructure to support enrichments closes
apache/incubator-metron#64
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/taxii/TaxiiIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/taxii/TaxiiIntegrationTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/taxii/TaxiiIntegrationTest.java
deleted file mode 100644
index f61d729..0000000
--- a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/taxii/TaxiiIntegrationTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.dataloads.taxii;
-
-import com.google.common.base.Splitter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
-import org.apache.metron.integration.util.UnitTestHelper;
-import org.apache.metron.integration.util.mock.MockHTable;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-public class TaxiiIntegrationTest {
-
- @Before
- public void setup() throws IOException {
- MockTaxiiService.start(8282);
- }
-
- @After
- public void teardown() {
- MockTaxiiService.shutdown();
- MockHTable.Provider.clear();
- }
-
-
-
- @Test
- public void testTaxii() throws Exception {
- /**
- {
- "endpoint" : "http://localhost:8282/taxii-discovery-service"
- ,"type" : "DISCOVER"
- ,"collection" : "guest.Abuse_ch"
- ,"tableMap" : {
- "DomainName:FQDN" : "malicious_domain:cf"
- ,"Address:IPV_4_ADDR" : "malicious_address:cf"
- }
- }
- */
- String taxiiConnectionConfig = "{\n" +
- " \"endpoint\" : \"http://localhost:8282/taxii-discovery-service\"\n" +
- " ,\"type\" : \"DISCOVER\"\n" +
- " ,\"collection\" : \"guest.Abuse_ch\"\n" +
- " ,\"tableMap\" : {\n" +
- " \"DomainName:FQDN\" : \"malicious_domain:cf\"\n" +
- " ,\"Address:IPV_4_ADDR\" : \"malicious_address:cf\"\n" +
- " }\n" +
- " }";
- final MockHTable.Provider provider = new MockHTable.Provider();
- final Configuration config = HBaseConfiguration.create();
- TaxiiHandler handler = new TaxiiHandler(TaxiiConnectionConfig.load(taxiiConnectionConfig), new StixExtractor(), config ) {
- @Override
- protected synchronized HTableInterface createHTable(TableInfo tableInfo) throws IOException {
- return provider.addToCache(tableInfo.getTableName(), tableInfo.getColumnFamily());
- }
- };
- //UnitTestHelper.verboseLogging();
- handler.run();
- Set<String> maliciousDomains;
- {
- MockHTable table = (MockHTable) provider.getTable(config, "malicious_domain");
- maliciousDomains = getIndicators(table.getPutLog(), "cf");
- }
- Assert.assertTrue(maliciousDomains.contains("www.office-112.com"));
- Assert.assertEquals(numStringsMatch(MockTaxiiService.pollMsg, "DomainNameObj:Value condition=\"Equals\""), maliciousDomains.size());
- Set<String> maliciousAddresses;
- {
- MockHTable table = (MockHTable) provider.getTable(config, "malicious_address");
- maliciousAddresses= getIndicators(table.getPutLog(), "cf");
- }
- Assert.assertTrue(maliciousAddresses.contains("94.102.53.142"));
- Assert.assertEquals(numStringsMatch(MockTaxiiService.pollMsg, "AddressObj:Address_Value condition=\"Equal\""), maliciousAddresses.size());
- MockHTable.Provider.clear();
- }
-
- private static int numStringsMatch(String xmlBundle, String text) {
- int cnt = 0;
- for(String line : Splitter.on("\n").split(xmlBundle)) {
- if(line.contains(text)) {
- cnt++;
- }
- }
- return cnt;
- }
-
- private static Set<String> getIndicators(Iterable<Put> puts, String cf) throws IOException {
- ThreatIntelConverter converter = new ThreatIntelConverter();
- Set<String> ret = new HashSet<>();
- for(Put p : puts) {
- ret.add(converter.fromPut(p, cf).getKey().indicator);
- }
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-Elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/pom.xml b/metron-streaming/Metron-Elasticsearch/pom.xml
index ab9242a..d2eade3 100644
--- a/metron-streaming/Metron-Elasticsearch/pom.xml
+++ b/metron-streaming/Metron-Elasticsearch/pom.xml
@@ -27,6 +27,11 @@
</properties>
<dependencies>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_hbase_guava_version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>Metron-Common</artifactId>
<version>${project.parent.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
index 45631f2..81aeec2 100644
--- a/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
+++ b/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
@@ -76,7 +76,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
indexName = sensorEnrichmentConfig.getIndex();
}
IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName + "_index_" + indexPostfix,
- sensorType);
+ sensorType + "_doc");
indexRequestBuilder.setSource(message.toJSONString());
bulkRequest.add(indexRequestBuilder);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
index 2765c25..4f26365 100644
--- a/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
+++ b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
@@ -54,7 +54,7 @@ public class ElasticsearchEnrichmentIntegrationTest extends EnrichmentIntegratio
if (elasticSearchComponent.hasIndex(index)) {
List<Map<String, Object>> docsFromDisk;
try {
- docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf");
+ docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf_doc");
docsFromDisk = readDocsFromDisk(hdfsDir);
System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/pom.xml b/metron-streaming/Metron-EnrichmentAdapters/pom.xml
index 83e1aed..7399ade 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/pom.xml
+++ b/metron-streaming/Metron-EnrichmentAdapters/pom.xml
@@ -28,7 +28,7 @@
<mysql.version>5.1.31</mysql.version>
<slf4j.version>1.7.7</slf4j.version>
<storm.hdfs.version>0.1.2</storm.hdfs.version>
- <guava.version>${global_guava_version}</guava.version>
+ <guava.version>${global_hbase_guava_version}</guava.version>
</properties>
<dependencies>
<dependency>
@@ -99,6 +99,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${global_hadoop_version}</version>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>servlet-api</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
index 1d0b5c1..73a7ad5 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
@@ -20,12 +20,13 @@ package org.apache.metron.enrichment.adapters.cif;
import java.io.Serializable;
+import org.apache.metron.enrichment.bolt.CacheKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-public abstract class AbstractCIFAdapter implements EnrichmentAdapter,Serializable{
+public abstract class AbstractCIFAdapter implements EnrichmentAdapter<CacheKey>,Serializable{
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
index 1ab3b83..63d6c0b 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.apache.metron.enrichment.bolt.CacheKey;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.json.simple.JSONObject;
import org.apache.hadoop.conf.Configuration;
@@ -36,7 +37,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.log4j.Logger;
@SuppressWarnings("unchecked")
-public class CIFHbaseAdapter implements EnrichmentAdapter<String>,Serializable {
+public class CIFHbaseAdapter implements EnrichmentAdapter<CacheKey>,Serializable {
private static final long serialVersionUID = 1L;
private String _tableName;
@@ -55,12 +56,12 @@ public class CIFHbaseAdapter implements EnrichmentAdapter<String>,Serializable {
.getLogger(CIFHbaseAdapter.class);
@Override
- public void logAccess(String value) {
+ public void logAccess(CacheKey value) {
}
- public JSONObject enrich(String metadata) {
-
+ public JSONObject enrich(CacheKey k) {
+ String metadata = k.getValue();
JSONObject output = new JSONObject();
LOGGER.debug("=======Looking Up For:" + metadata);
output.putAll(getCIFObject(metadata));
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
index 4e48756..5d12a29 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
@@ -19,6 +19,7 @@ package org.apache.metron.enrichment.adapters.geo;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.metron.enrichment.adapters.jdbc.JdbcAdapter;
+import org.apache.metron.enrichment.bolt.CacheKey;
import org.json.simple.JSONObject;
import java.net.InetAddress;
@@ -29,19 +30,19 @@ public class GeoAdapter extends JdbcAdapter {
private InetAddressValidator ipvalidator = new InetAddressValidator();
@Override
- public void logAccess(String value) {
+ public void logAccess(CacheKey value) {
}
@SuppressWarnings("unchecked")
@Override
- public JSONObject enrich(String value) {
+ public JSONObject enrich(CacheKey value) {
JSONObject enriched = new JSONObject();
try {
- InetAddress addr = InetAddress.getByName(value);
+ InetAddress addr = InetAddress.getByName(value.getValue());
if (addr.isAnyLocalAddress() || addr.isLoopbackAddress()
|| addr.isSiteLocalAddress() || addr.isMulticastAddress()
- || !ipvalidator.isValidInet4Address(value)) {
+ || !ipvalidator.isValidInet4Address(value.getValue())) {
return new JSONObject();
}
String locidQuery = "select IPTOLOCID(\"" + value
@@ -67,8 +68,7 @@ public class GeoAdapter extends JdbcAdapter {
}
resultSet.close();
} catch (Exception e) {
- e.printStackTrace();
- _LOG.error("Enrichment failure: " + e);
+ _LOG.error("Enrichment failure: " + e.getMessage(), e);
return new JSONObject();
}
return enriched;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
index 5118e5f..329456f 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
@@ -20,13 +20,14 @@ package org.apache.metron.enrichment.adapters.host;
import java.io.Serializable;
+import org.apache.metron.enrichment.bolt.CacheKey;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-public abstract class AbstractHostAdapter implements EnrichmentAdapter<String>,
+public abstract class AbstractHostAdapter implements EnrichmentAdapter<CacheKey>,
Serializable{
/**
@@ -37,7 +38,7 @@ public abstract class AbstractHostAdapter implements EnrichmentAdapter<String>,
.getLogger(AbstractHostAdapter.class);
abstract public boolean initializeAdapter();
- abstract public JSONObject enrich(String metadata);
+ abstract public JSONObject enrich(CacheKey metadata);
@Override
public void cleanup() {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
index c55b918..640e548 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
@@ -17,6 +17,7 @@
*/
package org.apache.metron.enrichment.adapters.host;
+import org.apache.metron.enrichment.bolt.CacheKey;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -53,14 +54,14 @@ public class HostFromJSONListAdapter extends AbstractHostAdapter {
}
@Override
- public void logAccess(String value) {
+ public void logAccess(CacheKey value) {
}
@SuppressWarnings("unchecked")
@Override
- public JSONObject enrich(String metadata) {
-
+ public JSONObject enrich(CacheKey k) {
+ String metadata = k.getValue();
if(!_known_hosts.containsKey(metadata))
return new JSONObject();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
index d7fcfbd..f92bd3f 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
@@ -20,6 +20,7 @@ package org.apache.metron.enrichment.adapters.host;
import java.util.Map;
+import org.apache.metron.enrichment.bolt.CacheKey;
import org.json.simple.JSONObject;
@SuppressWarnings("serial")
@@ -43,20 +44,20 @@ public class HostFromPropertiesFileAdapter extends AbstractHostAdapter {
}
@Override
- public void logAccess(String value) {
+ public void logAccess(CacheKey value) {
}
@SuppressWarnings("unchecked")
@Override
- public JSONObject enrich(String metadata) {
+ public JSONObject enrich(CacheKey metadata) {
- if(!_known_hosts.containsKey(metadata))
+ if(!_known_hosts.containsKey(metadata.getValue()))
return new JSONObject();
JSONObject enrichment = new JSONObject();
- enrichment.put("known_info", (JSONObject) _known_hosts.get(metadata));
+ enrichment.put("known_info", (JSONObject) _known_hosts.get(metadata.getValue()));
return enrichment;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
index 5eabdd2..b21044f 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
@@ -17,6 +17,7 @@
*/
package org.apache.metron.enrichment.adapters.jdbc;
+import org.apache.metron.enrichment.bolt.CacheKey;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,7 +26,7 @@ import java.io.Serializable;
import java.net.InetAddress;
import java.sql.*;
-public abstract class JdbcAdapter implements EnrichmentAdapter<String>,
+public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>,
Serializable {
protected static final Logger _LOG = LoggerFactory
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
new file mode 100644
index 0000000..2b7d1a0
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.adapters.simplehbase;
+
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentValue;
+import org.apache.metron.hbase.lookup.EnrichmentLookup;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.apache.metron.reference.lookup.accesstracker.NoopAccessTracker;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class SimpleHBaseAdapter implements EnrichmentAdapter<CacheKey>,Serializable {
+ protected static final Logger _LOG = LoggerFactory.getLogger(SimpleHBaseAdapter.class);
+ protected SimpleHBaseConfig config;
+ protected EnrichmentLookup lookup;
+
+ public SimpleHBaseAdapter() {
+ }
+ public SimpleHBaseAdapter(SimpleHBaseConfig config) {
+ withConfig(config);
+ }
+
+ public SimpleHBaseAdapter withConfig(SimpleHBaseConfig config) {
+ this.config = config;
+ return this;
+ }
+
+ @Override
+ public void logAccess(CacheKey value) {
+ }
+
+
+ @Override
+ public JSONObject enrich(CacheKey value) {
+ JSONObject enriched = new JSONObject();
+ List<String> enrichmentTypes = value.getConfig()
+ .getFieldToEnrichmentTypeMap()
+ .get(EnrichmentUtils.toTopLevelField(value.getField()));
+ if(enrichmentTypes != null && value.getValue() != null) {
+ try {
+ for (LookupKV<EnrichmentKey, EnrichmentValue> kv :
+ lookup.get(Iterables.transform(enrichmentTypes
+ , new EnrichmentUtils.TypeToKey(value.getValue())
+ )
+ , lookup.getTable()
+ , false
+ )
+ )
+ {
+ if (kv != null && kv.getValue() != null && kv.getValue().getMetadata() != null) {
+ for (Map.Entry<String, String> values : kv.getValue().getMetadata().entrySet()) {
+ enriched.put(kv.getKey().type + "." + values.getKey(), values.getValue());
+ }
+ _LOG.trace("Enriched type " + kv.getKey().type + " => " + enriched);
+ }
+ }
+ }
+ catch (IOException e) {
+ _LOG.error("Unable to retrieve value: " + e.getMessage(), e);
+ throw new RuntimeException("Unable to retrieve value: " + e.getMessage(), e);
+ }
+ }
+ return enriched;
+ }
+
+ @Override
+ public boolean initializeAdapter() {
+ String hbaseTable = config.getHBaseTable();
+ Configuration hbaseConfig = HBaseConfiguration.create();
+ try {
+ lookup = new EnrichmentLookup( config.getProvider().getTable(hbaseConfig, hbaseTable)
+ , config.getHBaseCF()
+ , new NoopAccessTracker()
+ );
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to initialize adapter: " + e.getMessage(), e);
+ }
+ return true;
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ lookup.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to cleanup access tracker", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfig.java
new file mode 100644
index 0000000..fefe008
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfig.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.adapters.simplehbase;
+
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.Serializable;
+
+
+public class SimpleHBaseConfig implements Serializable {
+ private String hBaseTable;
+ private String hBaseCF;
+ private TableProvider provider = new HTableProvider();
+ public String getHBaseTable() {
+ return hBaseTable;
+ }
+ public String getHBaseCF() {
+ return hBaseCF;
+ }
+
+ public TableProvider getProvider() {
+ return provider;
+ }
+
+ public SimpleHBaseConfig withProviderImpl(String connectorImpl) {
+ provider = EnrichmentUtils.getTableProvider(connectorImpl, new HTableProvider());
+ return this;
+ }
+ public SimpleHBaseConfig withHBaseTable(String hBaseTable) {
+ this.hBaseTable = hBaseTable;
+ return this;
+ }
+
+ public SimpleHBaseConfig withHBaseCF(String cf) {
+ this.hBaseCF= cf;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
deleted file mode 100644
index 9c14cdf..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.adapters.threat;
-
-import java.io.Serializable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-
-public abstract class AbstractThreatAdapter implements EnrichmentAdapter,Serializable{
-
-
- private static final long serialVersionUID = 1524030932856141771L;
- protected static final Logger LOG = LoggerFactory
- .getLogger(AbstractThreatAdapter.class);
-
- abstract public boolean initializeAdapter();
-
- @Override
- public void cleanup() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
deleted file mode 100644
index 1ce99cb..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.adapters.threat;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.log4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("unchecked")
-public class ThreatHbaseAdapter implements EnrichmentAdapter<String>,
- Serializable {
-
- protected static final org.slf4j.Logger LOG = LoggerFactory
- .getLogger(ThreatHbaseAdapter.class);
- private static final long serialVersionUID = 1L;
- private String _tableName;
- private HTableInterface table;
- private String _quorum;
- private String _port;
-
- public ThreatHbaseAdapter(String quorum, String port, String tableName) {
- _quorum = quorum;
- _port = port;
- _tableName = tableName;
- }
-
- /** The LOGGER. */
- private static final Logger LOGGER = Logger
- .getLogger(ThreatHbaseAdapter.class);
-
- @Override
- public void logAccess(String value) {
-
- }
-
- public JSONObject enrich(String metadata) {
-
- JSONObject output = new JSONObject();
- LOGGER.debug("=======Looking Up For:" + metadata);
- output.putAll(getThreatObject(metadata));
-
- return output;
- }
-
- @SuppressWarnings({ "rawtypes", "deprecation" })
- protected Map getThreatObject(String key) {
-
- LOGGER.debug("=======Pinging HBase For:" + key);
-
- Get get = new Get(Bytes.toBytes(key));
- Result rs;
- Map output = new HashMap();
-
- try {
- rs = table.get(get);
-
- if (!rs.isEmpty()) {
- byte[] source_family = Bytes.toBytes("source");
- JSONParser parser = new JSONParser();
-
- Map<byte[], byte[]> sourceFamilyMap = rs.getFamilyMap(source_family);
-
- for (Map.Entry<byte[], byte[]> entry : sourceFamilyMap.entrySet()) {
- String k = Bytes.toString(entry.getKey());
- LOGGER.debug("=======Found intel from source: " + k);
- output.put(k,parser.parse(Bytes.toString(entry.getValue())));
- }
- }
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return output;
- }
-
- @Override
- public boolean initializeAdapter() {
-
- // Initialize HBase Table
- Configuration conf = null;
- conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum", _quorum);
- conf.set("hbase.zookeeper.property.clientPort", _port);
-
- try {
- LOGGER.debug("=======Connecting to HBASE===========");
- LOGGER.debug("=======ZOOKEEPER = "
- + conf.get("hbase.zookeeper.quorum"));
- HConnection connection = HConnectionManager.createConnection(conf);
- table = connection.getTable(_tableName);
- return true;
- } catch (IOException e) {
- // TODO Auto-generated catch block
- LOGGER.debug("=======Unable to Connect to HBASE===========");
- e.printStackTrace();
- }
-
- return false;
- }
-
- @Override
- public void cleanup() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
new file mode 100644
index 0000000..c80b57a
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.adapters.threatintel;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.lookup.EnrichmentLookup;
+import org.apache.metron.reference.lookup.accesstracker.BloomAccessTracker;
+import org.apache.metron.reference.lookup.accesstracker.PersistentAccessTracker;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.UUID;
+
+public class ThreatIntelAdapter implements EnrichmentAdapter<CacheKey>,Serializable {
+ protected static final Logger _LOG = LoggerFactory.getLogger(ThreatIntelAdapter.class);
+ protected ThreatIntelConfig config;
+ protected EnrichmentLookup lookup;
+
+ public ThreatIntelAdapter() {
+ }
+ public ThreatIntelAdapter(ThreatIntelConfig config) {
+ withConfig(config);
+ }
+
+ public ThreatIntelAdapter withConfig(ThreatIntelConfig config) {
+ this.config = config;
+ return this;
+ }
+
+ @Override
+ public void logAccess(CacheKey value) {
+ List<String> enrichmentTypes = value.getConfig().getFieldToThreatIntelTypeMap().get(value.getField());
+ if(enrichmentTypes != null) {
+ for(String enrichmentType : enrichmentTypes) {
+ lookup.getAccessTracker().logAccess(new EnrichmentKey(enrichmentType, value.getValue()));
+ }
+ }
+ }
+
+
+ @Override
+ public JSONObject enrich(CacheKey value) {
+ JSONObject enriched = new JSONObject();
+ List<String> enrichmentTypes = value.getConfig()
+ .getFieldToThreatIntelTypeMap()
+ .get(EnrichmentUtils.toTopLevelField(value.getField()));
+ if(enrichmentTypes != null) {
+ int i = 0;
+ try {
+ for (Boolean isThreat :
+ lookup.exists(Iterables.transform(enrichmentTypes
+ , new EnrichmentUtils.TypeToKey(value.getValue())
+ )
+ , lookup.getTable()
+ , false
+ )
+ )
+ {
+ String enrichmentType = enrichmentTypes.get(i++);
+ if (isThreat) {
+ enriched.put(enrichmentType, "alert");
+ _LOG.trace("Enriched value => " + enriched);
+ }
+ }
+ }
+ catch(IOException e) {
+ throw new RuntimeException("Unable to retrieve value", e);
+ }
+ }
+ //throw new RuntimeException("Unable to retrieve value " + value);
+ return enriched;
+ }
+
+ @Override
+ public boolean initializeAdapter() {
+ PersistentAccessTracker accessTracker;
+ String hbaseTable = config.getHBaseTable();
+ int expectedInsertions = config.getExpectedInsertions();
+ double falsePositives = config.getFalsePositiveRate();
+ String trackerHBaseTable = config.getTrackerHBaseTable();
+ String trackerHBaseCF = config.getTrackerHBaseCF();
+ long millisecondsBetweenPersist = config.getMillisecondsBetweenPersists();
+ BloomAccessTracker bat = new BloomAccessTracker(hbaseTable, expectedInsertions, falsePositives);
+ Configuration hbaseConfig = HBaseConfiguration.create();
+ try {
+ accessTracker = new PersistentAccessTracker( hbaseTable
+ , UUID.randomUUID().toString()
+ , config.getProvider().getTable(hbaseConfig, trackerHBaseTable)
+ , trackerHBaseCF
+ , bat
+ , millisecondsBetweenPersist
+ );
+ lookup = new EnrichmentLookup(config.getProvider().getTable(hbaseConfig, hbaseTable), config.getHBaseCF(), accessTracker);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to initialize ThreatIntelAdapter", e);
+ }
+
+ return true;
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ lookup.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to cleanup access tracker", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelConfig.java
new file mode 100644
index 0000000..ff29aea
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelConfig.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.adapters.threatintel;
+
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+
+public class ThreatIntelConfig implements Serializable {
+ public static final long MS_IN_HOUR = 10000*60*60;
+ private String hBaseTable;
+ private String hBaseCF;
+ private double falsePositiveRate = 0.03;
+ private int expectedInsertions = 100000;
+ private String trackerHBaseTable;
+ private String trackerHBaseCF;
+ private long millisecondsBetweenPersists = 2*MS_IN_HOUR;
+ private TableProvider provider = new HTableProvider();
+
+ public String getHBaseTable() {
+ return hBaseTable;
+ }
+
+ public int getExpectedInsertions() {
+ return expectedInsertions;
+ }
+
+ public double getFalsePositiveRate() {
+ return falsePositiveRate;
+ }
+
+ public String getTrackerHBaseTable() {
+ return trackerHBaseTable;
+ }
+
+ public String getTrackerHBaseCF() {
+ return trackerHBaseCF;
+ }
+
+ public long getMillisecondsBetweenPersists() {
+ return millisecondsBetweenPersists;
+ }
+
+ public String getHBaseCF() {
+ return hBaseCF;
+ }
+
+ public TableProvider getProvider() {
+ return provider;
+ }
+
+ public ThreatIntelConfig withProviderImpl(String connectorImpl) {
+ provider = EnrichmentUtils.getTableProvider(connectorImpl, new HTableProvider());
+ return this;
+ }
+
+ public ThreatIntelConfig withTrackerHBaseTable(String hBaseTable) {
+ this.trackerHBaseTable = hBaseTable;
+ return this;
+ }
+
+ public ThreatIntelConfig withTrackerHBaseCF(String cf) {
+ this.trackerHBaseCF = cf;
+ return this;
+ }
+ public ThreatIntelConfig withHBaseTable(String hBaseTable) {
+ this.hBaseTable = hBaseTable;
+ return this;
+ }
+
+ public ThreatIntelConfig withHBaseCF(String cf) {
+ this.hBaseCF= cf;
+ return this;
+ }
+
+ public ThreatIntelConfig withFalsePositiveRate(double falsePositiveRate) {
+ this.falsePositiveRate = falsePositiveRate;
+ return this;
+ }
+
+ public ThreatIntelConfig withExpectedInsertions(int expectedInsertions) {
+ this.expectedInsertions = expectedInsertions;
+ return this;
+ }
+
+ public ThreatIntelConfig withMillisecondsBetweenPersists(long millisecondsBetweenPersists) {
+ this.millisecondsBetweenPersists = millisecondsBetweenPersists;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
index 442e609..8f0f589 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.enrichment.bolt.CacheKey;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.json.simple.JSONObject;
@@ -36,7 +37,7 @@ import org.apache.metron.tldextractor.BasicTldExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class WhoisHBaseAdapter implements EnrichmentAdapter<String>,
+public class WhoisHBaseAdapter implements EnrichmentAdapter<CacheKey>,
Serializable {
protected static final Logger LOG = LoggerFactory
@@ -80,7 +81,7 @@ public class WhoisHBaseAdapter implements EnrichmentAdapter<String>,
LOG.trace("--------CONNECTED TO TABLE: " + table);
- JSONObject tester = enrich("cisco.com");
+ JSONObject tester = enrich(new CacheKey("whois", "cisco.com", null));
if (tester.keySet().size() == 0)
throw new IOException(
@@ -96,13 +97,13 @@ public class WhoisHBaseAdapter implements EnrichmentAdapter<String>,
}
@Override
- public void logAccess(String value) {
+ public void logAccess(CacheKey value) {
}
@SuppressWarnings({ "unchecked", "deprecation" })
- public JSONObject enrich(String metadataIn) {
-
+ public JSONObject enrich(CacheKey k) {
+ String metadataIn = k.getValue();
String metadata = tldex.extract2LD(metadataIn);
LOG.trace("[Metron] Pinging HBase For:" + metadata);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/CacheKey.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/CacheKey.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/CacheKey.java
new file mode 100644
index 0000000..5d5b1e1
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/CacheKey.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import org.apache.metron.domain.SensorEnrichmentConfig;
+
+public class CacheKey {
+ private String field;
+ private String value;
+ private SensorEnrichmentConfig config;
+
+ public CacheKey(String field, String value, SensorEnrichmentConfig config) {
+ this.field = field;
+ this.value = value;
+ this.config = config;
+ }
+
+ public String getField() {
+ return field;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public SensorEnrichmentConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public String toString() {
+ return "CacheKey{" +
+ "field='" + field + '\'' +
+ ", value='" + value + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ CacheKey cacheKey = (CacheKey) o;
+
+ if (getField() != null ? !getField().equals(cacheKey.getField()) : cacheKey.getField() != null) return false;
+ if (getValue() != null ? !getValue().equals(cacheKey.getValue()) : cacheKey.getValue() != null) return false;
+ return config != null ? config.equals(cacheKey.config) : cacheKey.config == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getField() != null ? getField().hashCode() : 0;
+ result = 31 * result + (getValue() != null ? getValue().hashCode() : 0);
+ result = 31 * result + (config != null ? config.hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index bfb4d91..9b47d71 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -20,6 +20,7 @@ package org.apache.metron.enrichment.bolt;
import backtype.storm.task.TopologyContext;
import org.apache.metron.bolt.JoinBolt;
import org.apache.metron.domain.Enrichment;
+import org.apache.metron.domain.SensorEnrichmentConfig;
import org.apache.metron.topology.TopologyUtils;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
@@ -56,8 +57,11 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
public Set<String> getStreamIds(JSONObject message) {
Set<String> streamIds = new HashSet<>();
String sourceType = TopologyUtils.getSensorType(message);
- for (String enrichmentType : getFieldMap(sourceType).keySet()) {
- streamIds.add(enrichmentType);
+ Map<String, List<String>> fieldMap = getFieldMap(sourceType);
+ if(fieldMap != null) {
+ for (String enrichmentType : getFieldMap(sourceType).keySet()) {
+ streamIds.add(enrichmentType);
+ }
}
streamIds.add("message");
return streamIds;
@@ -85,7 +89,19 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
return message;
}
- protected Map<String, List<String>> getFieldMap(String sensorType) {
- return configurations.getSensorEnrichmentConfig(sensorType).getEnrichmentFieldMap();
+ public Map<String, List<String>> getFieldMap(String sourceType) {
+ if(sourceType != null) {
+ SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+ if (config != null) {
+ return config.getEnrichmentFieldMap();
+ }
+ else {
+ LOG.error("Unable to retrieve a sensor enrichment config of " + sourceType);
+ }
+ }
+ else {
+ LOG.error("Trying to retrieve a field map with source type of null");
+ }
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 08b223c..7b76c57 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -30,6 +30,7 @@ import com.google.common.cache.LoadingCache;
import org.apache.metron.Constants;
import org.apache.metron.bolt.ConfiguredBolt;
import org.apache.metron.domain.Enrichment;
+import org.apache.metron.domain.SensorEnrichmentConfig;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.apache.metron.helpers.topology.ErrorUtils;
import org.json.simple.JSONObject;
@@ -65,13 +66,13 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
.getLogger(GenericEnrichmentBolt.class);
private OutputCollector collector;
-
protected String enrichmentType;
- protected EnrichmentAdapter adapter;
- protected transient CacheLoader<String, JSONObject> loader;
- protected transient LoadingCache<String, JSONObject> cache;
+ protected EnrichmentAdapter<CacheKey> adapter;
+ protected transient CacheLoader<CacheKey, JSONObject> loader;
+ protected transient LoadingCache<CacheKey, JSONObject> cache;
protected Long maxCacheSize;
protected Long maxTimeRetain;
+ protected boolean invalidateCacheOnReload = false;
public GenericEnrichmentBolt(String zookeeperUrl) {
super(zookeeperUrl);
@@ -108,9 +109,23 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
return this;
}
+ public GenericEnrichmentBolt withCacheInvalidationOnReload(boolean cacheInvalidationOnReload) {
+ this.invalidateCacheOnReload= cacheInvalidationOnReload;
+ return this;
+ }
+ @Override
+ protected void reloadCallback() {
+ if(invalidateCacheOnReload) {
+ if (cache != null) {
+ cache.invalidateAll();
+ }
+ }
+ }
+
@Override
public void prepare(Map conf, TopologyContext topologyContext,
OutputCollector collector) {
+ super.prepare(conf, topologyContext, collector);
this.collector = collector;
if (this.maxCacheSize == null)
throw new IllegalStateException("MAX_CACHE_SIZE_OBJECTS_NUM must be specified");
@@ -118,8 +133,8 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");
if (this.adapter == null)
throw new IllegalStateException("Adapter must be specified");
- loader = new CacheLoader<String, JSONObject>() {
- public JSONObject load(String key) throws Exception {
+ loader = new CacheLoader<CacheKey, JSONObject>() {
+ public JSONObject load(CacheKey key) throws Exception {
return adapter.enrich(key);
}
};
@@ -144,6 +159,7 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
public void execute(Tuple tuple) {
String key = tuple.getStringByField("key");
JSONObject rawMessage = (JSONObject) tuple.getValueByField("message");
+
JSONObject enrichedMessage = new JSONObject();
enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".begin.ts", "" + System.currentTimeMillis());
try {
@@ -151,6 +167,13 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
throw new Exception("Could not parse binary stream to JSON");
if (key == null)
throw new Exception("Key is not valid");
+ String sourceType = null;
+ if(rawMessage.containsKey(Constants.SENSOR_TYPE)) {
+ sourceType = rawMessage.get(Constants.SENSOR_TYPE).toString();
+ }
+ else {
+ throw new RuntimeException("Source type is missing from enrichment fragment: " + rawMessage.toJSONString());
+ }
for (Object o : rawMessage.keySet()) {
String field = (String) o;
String value = (String) rawMessage.get(field);
@@ -159,8 +182,13 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
} else {
JSONObject enrichedField = new JSONObject();
if (value != null && value.length() != 0) {
- adapter.logAccess(value);
- enrichedField = cache.getUnchecked(value);
+ SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+ if(config == null) {
+ throw new RuntimeException("Unable to find " + config);
+ }
+ CacheKey cacheKey= new CacheKey(field, value, config);
+ adapter.logAccess(cacheKey);
+ enrichedField = cache.getUnchecked(cacheKey);
if (enrichedField == null)
throw new Exception("[Metron] Could not enrich string: "
+ value);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index 014e0a9..d0bc833 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -17,6 +17,7 @@
*/
package org.apache.metron.enrichment.bolt;
+import org.apache.metron.domain.SensorEnrichmentConfig;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +35,15 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
}
@Override
- public Map<String, List<String>> getFieldMap(String sensorType) {
- return configurations.getSensorEnrichmentConfig(sensorType).getThreatIntelFieldMap();
+ public Map<String, List<String>> getFieldMap(String sourceType) {
+ SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+ if(config != null) {
+ return config.getThreatIntelFieldMap();
+ }
+ else {
+ LOG.error("Unable to retrieve sensor config: " + sourceType);
+ return null;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
index 228f844..7a58673 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
@@ -17,7 +17,16 @@
*/
package org.apache.metron.enrichment.utils;
+import com.google.common.base.Function;
import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+
+import javax.annotation.Nullable;
+import java.lang.reflect.InvocationTargetException;
public class EnrichmentUtils {
@@ -27,6 +36,46 @@ public class EnrichmentUtils {
return Joiner.on(".").join(new String[]{KEY_PREFIX, enrichmentName, field});
}
+ public static class TypeToKey implements Function<String, EnrichmentKey> {
+ private final String indicator;
+
+ public TypeToKey(String indicator) {
+ this.indicator = indicator;
+
+ }
+ @Nullable
+ @Override
+ public EnrichmentKey apply(@Nullable String enrichmentType) {
+ return new EnrichmentKey(enrichmentType, indicator);
+ }
+ }
+ public static String toTopLevelField(String field) {
+ if(field == null) {
+ return null;
+ }
+ return Iterables.getLast(Splitter.on('.').split(field));
+ }
+ public static TableProvider getTableProvider(String connectorImpl, TableProvider defaultImpl) {
+ if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
+ return defaultImpl;
+ }
+ else {
+ try {
+ Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
+ return clazz.getConstructor().newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("Unable to instantiate connector.", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Unable to instantiate connector: illegal access", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalStateException("Unable to instantiate connector", e);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalStateException("Unable to instantiate connector: no such method", e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to instantiate connector: class not found", e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
deleted file mode 100644
index b95f4b8..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.threatintel;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
-import org.apache.metron.reference.lookup.accesstracker.BloomAccessTracker;
-import org.apache.metron.reference.lookup.accesstracker.PersistentAccessTracker;
-import org.apache.metron.threatintel.hbase.ThreatIntelLookup;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.UUID;
-
-public class ThreatIntelAdapter implements EnrichmentAdapter<String>,Serializable {
- protected static final Logger _LOG = LoggerFactory.getLogger(ThreatIntelAdapter.class);
- protected ThreatIntelConfig config;
- protected ThreatIntelLookup lookup;
-
- public ThreatIntelAdapter() {
- }
- public ThreatIntelAdapter(ThreatIntelConfig config) {
- withConfig(config);
- }
-
- public ThreatIntelAdapter withConfig(ThreatIntelConfig config) {
- this.config = config;
- return this;
- }
-
- @Override
- public void logAccess(String value) {
- lookup.getAccessTracker().logAccess(new ThreatIntelKey(value));
- }
-
- @Override
- public JSONObject enrich(String value) {
- JSONObject enriched = new JSONObject();
- boolean isThreat = false;
- try {
- isThreat = lookup.exists(new ThreatIntelKey(value), lookup.getTable(), false);
- } catch (IOException e) {
- throw new RuntimeException("Unable to retrieve value", e);
- }
- if(isThreat) {
- enriched.put(config.getHBaseTable(), "alert");
- _LOG.trace("Enriched value => " + enriched);
- }
- //throw new RuntimeException("Unable to retrieve value " + value);
- return enriched;
- }
-
- @Override
- public boolean initializeAdapter() {
- PersistentAccessTracker accessTracker;
- String hbaseTable = config.getHBaseTable();
- int expectedInsertions = config.getExpectedInsertions();
- double falsePositives = config.getFalsePositiveRate();
- String trackerHBaseTable = config.getTrackerHBaseTable();
- String trackerHBaseCF = config.getTrackerHBaseCF();
- long millisecondsBetweenPersist = config.getMillisecondsBetweenPersists();
- BloomAccessTracker bat = new BloomAccessTracker(hbaseTable, expectedInsertions, falsePositives);
- Configuration hbaseConfig = HBaseConfiguration.create();
- try {
- accessTracker = new PersistentAccessTracker( hbaseTable
- , UUID.randomUUID().toString()
- , config.getProvider().getTable(hbaseConfig, trackerHBaseTable)
- , trackerHBaseCF
- , bat
- , millisecondsBetweenPersist
- );
- lookup = new ThreatIntelLookup(config.getProvider().getTable(hbaseConfig, hbaseTable), config.getHBaseCF(), accessTracker);
- } catch (IOException e) {
- throw new IllegalStateException("Unable to initialize ThreatIntelAdapter", e);
- }
-
- return true;
- }
-
- @Override
- public void cleanup() {
- try {
- lookup.close();
- } catch (Exception e) {
- throw new RuntimeException("Unable to cleanup access tracker", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java
deleted file mode 100644
index bb45468..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.threatintel;
-
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
-
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-
-public class ThreatIntelConfig implements Serializable {
- public static final long MS_IN_HOUR = 10000*60*60;
- private String hBaseTable;
- private String hBaseCF;
- private double falsePositiveRate = 0.03;
- private int expectedInsertions = 100000;
- private String trackerHBaseTable;
- private String trackerHBaseCF;
- private long millisecondsBetweenPersists = 2*MS_IN_HOUR;
- private TableProvider provider = new HTableProvider();
-
- public String getHBaseTable() {
- return hBaseTable;
- }
-
- public int getExpectedInsertions() {
- return expectedInsertions;
- }
-
- public double getFalsePositiveRate() {
- return falsePositiveRate;
- }
-
- public String getTrackerHBaseTable() {
- return trackerHBaseTable;
- }
-
- public String getTrackerHBaseCF() {
- return trackerHBaseCF;
- }
-
- public long getMillisecondsBetweenPersists() {
- return millisecondsBetweenPersists;
- }
-
- public String getHBaseCF() {
- return hBaseCF;
- }
-
- public TableProvider getProvider() {
- return provider;
- }
-
- public ThreatIntelConfig withProviderImpl(String connectorImpl) {
- if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
- provider = new HTableProvider();
- }
- else {
- try {
- Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
- provider = clazz.getConstructor().newInstance();
- } catch (InstantiationException e) {
- throw new IllegalStateException("Unable to instantiate connector.", e);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException("Unable to instantiate connector: illegal access", e);
- } catch (InvocationTargetException e) {
- throw new IllegalStateException("Unable to instantiate connector", e);
- } catch (NoSuchMethodException e) {
- throw new IllegalStateException("Unable to instantiate connector: no such method", e);
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException("Unable to instantiate connector: class not found", e);
- }
- }
- return this;
- }
-
- public ThreatIntelConfig withTrackerHBaseTable(String hBaseTable) {
- this.trackerHBaseTable = hBaseTable;
- return this;
- }
-
- public ThreatIntelConfig withTrackerHBaseCF(String cf) {
- this.trackerHBaseCF = cf;
- return this;
- }
- public ThreatIntelConfig withHBaseTable(String hBaseTable) {
- this.hBaseTable = hBaseTable;
- return this;
- }
-
- public ThreatIntelConfig withHBaseCF(String cf) {
- this.hBaseCF= cf;
- return this;
- }
-
- public ThreatIntelConfig withFalsePositiveRate(double falsePositiveRate) {
- this.falsePositiveRate = falsePositiveRate;
- return this;
- }
-
- public ThreatIntelConfig withExpectedInsertions(int expectedInsertions) {
- this.expectedInsertions = expectedInsertions;
- return this;
- }
-
- public ThreatIntelConfig withMillisecondsBetweenPersists(long millisecondsBetweenPersists) {
- this.millisecondsBetweenPersists = millisecondsBetweenPersists;
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
index 6421a21..6ccc21e 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
@@ -19,6 +19,7 @@ package org.apache.metron.enrichment.adapters.cif;
import java.net.InetAddress;
import java.util.Properties;
+import org.apache.metron.enrichment.bolt.CacheKey;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.apache.metron.test.AbstractTestContext;
import org.junit.Assert;
@@ -174,17 +175,17 @@ public class CIFHbaseAdapterTest extends AbstractTestContext {
}
/**
- * Test method for {@link org.apache.metron.enrichment.adapters.cif.CIFHbaseAdapter#enrich(java.lang.String)}.
+ * Test method for {@link org.apache.metron.enrichment.adapters.cif.CIFHbaseAdapter#enrich(CacheKey)}.
*/
public void testEnrich() {
if(skipTests(this.getMode())){
return;//skip tests
}else{
cifHbaseAdapter.initializeAdapter();
- Assert.assertNotNull(cifHbaseAdapter.enrich("testinvalid.metadata"));
+ Assert.assertNotNull(cifHbaseAdapter.enrich(new CacheKey("cif", "testinvalid.metadata", null)));
- Assert.assertNotNull(cifHbaseAdapter.enrich("ivalid.ip"));
- Assert.assertNotNull(cifHbaseAdapter.enrich("1.1.1.10"));
+ Assert.assertNotNull(cifHbaseAdapter.enrich(new CacheKey("cif", "ivalid.ip", null)));
+ Assert.assertNotNull(cifHbaseAdapter.enrich(new CacheKey("cif", "1.1.1.10", null)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
index 9a0fe2a..8898c64 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
@@ -21,6 +21,7 @@ import java.util.Properties;
import org.apache.metron.enrichment.adapters.jdbc.JdbcAdapter;
import org.apache.metron.enrichment.adapters.jdbc.MySqlConfig;
+import org.apache.metron.enrichment.bolt.CacheKey;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.json.simple.JSONObject;
@@ -113,7 +114,7 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
}else{
try {
- JSONObject json = geoMySqlAdapter.enrich("72.163.4.161");
+ JSONObject json = geoMySqlAdapter.enrich(new CacheKey("dummy", "72.163.4.161", null));
//assert Geo Response is not null
System.out.println("json ="+json);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
index a56f5ba..ccf2ea3 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
@@ -19,6 +19,7 @@ package org.apache.metron.enrichment.adapters.whois;
import java.net.InetAddress;
import java.util.Properties;
+import org.apache.metron.enrichment.bolt.CacheKey;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.json.simple.JSONObject;
@@ -125,13 +126,13 @@ public class WhoisHBaseAdapterTest extends AbstractTestContext {
}
/**
- * Test method for {@link org.apache.metron.enrichment.adapters.whois.WhoisHBaseAdapter#enrich(java.lang.String)}.
+ * Test method for {@link org.apache.metron.enrichment.adapters.whois.WhoisHBaseAdapter#enrich(CacheKey)}.
*/
public void testEnrich() {
if(skipTests(this.getMode())){
return;//skip tests
}else{
- JSONObject json = whoisHbaseAdapter.enrich("72.163.4.161");
+ JSONObject json = whoisHbaseAdapter.enrich(new CacheKey("whois", "72.163.4.161", null));
//assert Geo Response is not null
Assert.assertNotNull(json);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-MessageParsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/pom.xml b/metron-streaming/Metron-MessageParsers/pom.xml
index a697aa8..2687213 100644
--- a/metron-streaming/Metron-MessageParsers/pom.xml
+++ b/metron-streaming/Metron-MessageParsers/pom.xml
@@ -37,6 +37,17 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${global_storm_version}</version>
@@ -56,11 +67,12 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${global_junit_version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>${global_guava_version}</version>
+ <version>${global_hbase_guava_version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-Solr/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/pom.xml b/metron-streaming/Metron-Solr/pom.xml
index cbb7395..925c219 100644
--- a/metron-streaming/Metron-Solr/pom.xml
+++ b/metron-streaming/Metron-Solr/pom.xml
@@ -27,6 +27,11 @@
</properties>
<dependencies>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_hbase_guava_version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>Metron-Common</artifactId>
<version>${project.parent.version}</version>