You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 14:59:33 UTC
[03/52] [abbrv] metron git commit: METRON-1460: Create a
complementary non-split-join enrichment topology closes apache/metron#940
http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java
new file mode 100644
index 0000000..5f82b1c
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+
+/**
+ * The strategy to use to construct the thread pool.
+ */
+public enum WorkerPoolStrategies {
+ /**
+ * Fixed thread pool
+ */
+ FIXED(numThreads -> Executors.newFixedThreadPool(numThreads)),
+ /**
+ * Work stealing thread pool.
+ */
+ WORK_STEALING(numThreads -> Executors.newWorkStealingPool(numThreads))
+ ;
+ Function<Integer, ExecutorService> creator;
+ WorkerPoolStrategies(Function<Integer, ExecutorService> creator) {
+ this.creator = creator;
+ }
+
+ public ExecutorService create(int numThreads) {
+ return creator.apply(numThreads);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
index ab3d462..63d39c5 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
@@ -21,6 +21,7 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
@@ -28,6 +29,7 @@ import org.apache.metron.enrichment.lookup.EnrichmentLookup;
import org.apache.metron.enrichment.lookup.handler.KeyWithContext;
import org.apache.metron.hbase.TableProvider;
import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.json.simple.JSONObject;
import sun.management.Sensor;
import javax.annotation.Nullable;
@@ -118,4 +120,18 @@ public class EnrichmentUtils {
}
}
+ public static JSONObject adjustKeys(JSONObject enrichedMessage, JSONObject enrichedField, String field, String prefix) {
+ if ( !enrichedField.isEmpty()) {
+ for (Object enrichedKey : enrichedField.keySet()) {
+ if(!StringUtils.isEmpty(prefix)) {
+ enrichedMessage.put(field + "." + enrichedKey, enrichedField.get(enrichedKey));
+ }
+ else {
+ enrichedMessage.put(enrichedKey, enrichedField.get(enrichedKey));
+ }
+ }
+ }
+ return enrichedMessage;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java
index 7898ccd..870d709 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java
@@ -18,15 +18,142 @@
package org.apache.metron.enrichment.utils;
import com.google.common.base.Joiner;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.enrichment.threatintel.RuleScore;
+import org.apache.metron.common.configuration.enrichment.threatintel.ThreatScore;
+import org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
+import org.apache.metron.threatintel.triage.ThreatTriageProcessor;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
public class ThreatIntelUtils {
+ public static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String KEY_PREFIX = "threatintels";
+ /**
+ * The message key under which the overall threat triage score is stored.
+ */
+ public static final String THREAT_TRIAGE_SCORE_KEY = "threat.triage.score";
+
+ /**
+ * The prefix of the message keys that record the threat triage rules that fired.
+ */
+ public static final String THREAT_TRIAGE_RULES_KEY = "threat.triage.rules";
+
+ /**
+ * The portion of the message key used to record the 'name' field of a rule.
+ */
+ public static final String THREAT_TRIAGE_RULE_NAME = "name";
+
+ /**
+ * The portion of the message key used to record the 'comment' field of a rule.
+ */
+ public static final String THREAT_TRIAGE_RULE_COMMENT = "comment";
+
+ /**
+ * The portion of the message key used to record the 'score' field of a rule.
+ */
+ public static final String THREAT_TRIAGE_RULE_SCORE = "score";
+
+ /**
+ * The portion of the message key used to record the 'reason' field of a rule.
+ */
+ public static final String THREAT_TRIAGE_RULE_REASON = "reason";
+
public static String getThreatIntelKey(String threatIntelName, String field) {
return Joiner.on(".").join(new String[]{KEY_PREFIX, threatIntelName, field});
}
+public static JSONObject triage(JSONObject ret, SensorEnrichmentConfig config, FunctionResolver functionResolver, Context stellarContext) {
+ LOG.trace("Received joined messages: {}", ret);
+ boolean isAlert = ret.containsKey("is_alert");
+ if(!isAlert) {
+ for (Object key : ret.keySet()) {
+ if (key.toString().startsWith("threatintels") && !key.toString().endsWith(".ts")) {
+ isAlert = true;
+ break;
+ }
+ }
+ }
+ else {
+ Object isAlertObj = ret.get("is_alert");
+ isAlert = ConversionUtils.convert(isAlertObj, Boolean.class);
+ if(!isAlert) {
+ ret.remove("is_alert");
+ }
+ }
+ if(isAlert) {
+ ret.put("is_alert" , "true");
+ String sourceType = MessageUtils.getSensorType(ret);
+ ThreatTriageConfig triageConfig = null;
+ if(config != null) {
+ triageConfig = config.getThreatIntel().getTriageConfig();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Found sensor enrichment config.", sourceType);
+ }
+ }
+ else {
+ LOG.debug("{}: Unable to find threat config.", sourceType );
+ }
+ if(triageConfig != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Found threat triage config: {}", sourceType, triageConfig);
+ }
+
+ if(LOG.isDebugEnabled() && (triageConfig.getRiskLevelRules() == null || triageConfig.getRiskLevelRules().isEmpty())) {
+ LOG.debug("{}: Empty rules!", sourceType);
+ }
+
+ // triage the threat
+ ThreatTriageProcessor threatTriageProcessor = new ThreatTriageProcessor(config, functionResolver, stellarContext);
+ ThreatScore score = threatTriageProcessor.apply(ret);
+
+ if(LOG.isDebugEnabled()) {
+ String rules = Joiner.on('\n').join(triageConfig.getRiskLevelRules());
+ LOG.debug("Marked {} as triage level {} with rules {}", sourceType, score.getScore(),
+ rules);
+ }
+
+ // attach the triage threat score to the message
+ if(score.getRuleScores().size() > 0) {
+ appendThreatScore(score, ret);
+ }
+ }
+ else {
+ LOG.debug("{}: Unable to find threat triage config!", sourceType);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Appends the threat score to the telemetry message.
+ * @param threatScore The threat triage score
+ * @param message The telemetry message being triaged.
+ */
+ private static void appendThreatScore(ThreatScore threatScore, JSONObject message) {
+ // append the overall threat score
+ message.put(THREAT_TRIAGE_SCORE_KEY, threatScore.getScore());
+
+ // append each of the rules - each rule is 'flat'
+ Joiner joiner = Joiner.on(".");
+ int i = 0;
+ for(RuleScore score: threatScore.getRuleScores()) {
+ message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_NAME), score.getRule().getName());
+ message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_COMMENT), score.getRule().getComment());
+ message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_SCORE), score.getRule().getScore());
+ message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i++, THREAT_TRIAGE_RULE_REASON), score.getReason());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index 828f4e3..267ca62 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -17,13 +17,6 @@
*/
package org.apache.metron.enrichment.integration;
-import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULES_KEY;
-import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_COMMENT;
-import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_NAME;
-import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_REASON;
-import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_SCORE;
-import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_SCORE_KEY;
-
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
@@ -54,6 +47,7 @@ import org.apache.metron.enrichment.integration.components.ConfigUploadComponent
import org.apache.metron.enrichment.lookup.LookupKV;
import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator;
import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions;
+import org.apache.metron.enrichment.utils.ThreatIntelUtils;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.integration.BaseIntegrationTest;
@@ -89,13 +83,15 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
public static final String DEFAULT_DMACODE= "test dmaCode";
public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE);
- protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/remote.yaml";
protected String templatePath = "../metron-enrichment/src/main/config/enrichment.properties.j2";
protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed";
private final List<byte[]> inputMessages = getInputMessages(sampleParsedPath);
private static File geoHdfsFile;
+ protected String fluxPath() {
+ return "../metron-enrichment/src/main/flux/enrichment/remote.yaml";
+ }
private static List<byte[]> getInputMessages(String path){
try{
@@ -190,7 +186,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
}});
FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
- .withTopologyLocation(new File(fluxPath))
+ .withTopologyLocation(new File(fluxPath()))
.withTopologyName("test")
.withTemplateLocation(new File(templatePath))
.withTopologyProperties(topologyProperties)
@@ -247,8 +243,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
protected void validateErrors(List<Map<String, Object>> errors) {
for(Map<String, Object> error : errors) {
- Assert.assertEquals("java.lang.ArithmeticException: / by zero", error.get(Constants.ErrorFields.MESSAGE.getName()));
- Assert.assertEquals("com.google.common.util.concurrent.UncheckedExecutionException: java.lang.ArithmeticException: / by zero", error.get(Constants.ErrorFields.EXCEPTION.getName()));
+ Assert.assertTrue(error.get(Constants.ErrorFields.MESSAGE.getName()).toString(), error.get(Constants.ErrorFields.MESSAGE.getName()).toString().contains("/ by zero") );
+ Assert.assertTrue(error.get(Constants.ErrorFields.EXCEPTION.getName()).toString().contains("/ by zero"));
Assert.assertEquals(Constants.ErrorType.ENRICHMENT_ERROR.getType(), error.get(Constants.ErrorFields.ERROR_TYPE.getName()));
Assert.assertEquals("{\"error_test\":{},\"source.type\":\"test\"}", error.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
}
@@ -399,17 +395,17 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
Assert.assertEquals(indexedDoc.getOrDefault("is_alert",""), "true");
// validate threat triage score
- Assert.assertTrue(indexedDoc.containsKey(THREAT_TRIAGE_SCORE_KEY));
- Double score = (Double) indexedDoc.get(THREAT_TRIAGE_SCORE_KEY);
+ Assert.assertTrue(indexedDoc.containsKey(ThreatIntelUtils.THREAT_TRIAGE_SCORE_KEY));
+ Double score = (Double) indexedDoc.get(ThreatIntelUtils.THREAT_TRIAGE_SCORE_KEY);
Assert.assertEquals(score, 10d, 1e-7);
// validate threat triage rules
Joiner joiner = Joiner.on(".");
Stream.of(
- joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_NAME),
- joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_COMMENT),
- joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_REASON),
- joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_SCORE))
+ joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_NAME),
+ joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_COMMENT),
+ joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_REASON),
+ joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_SCORE))
.forEach(key ->
Assert.assertTrue(String.format("Missing expected key: '%s'", key), indexedDoc.containsKey(key)));
}
@@ -471,11 +467,11 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
enriched = true;
}
if (ips.contains(indexedDoc.get(DST_IP))) {
- Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+ boolean isEnriched = Predicates.and(HostEnrichments.LOCAL_LOCATION
,HostEnrichments.IMPORTANT
,HostEnrichments.PRINTER_TYPE
- ).apply(new EvaluationPayload(indexedDoc, DST_IP))
- );
+ ).apply(new EvaluationPayload(indexedDoc, DST_IP));
+ Assert.assertTrue(isEnriched);
enriched = true;
}
}
@@ -492,11 +488,11 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
enriched = true;
}
if (ips.contains(indexedDoc.get(DST_IP))) {
- Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+ boolean isEnriched = Predicates.and(HostEnrichments.LOCAL_LOCATION
,HostEnrichments.IMPORTANT
,HostEnrichments.WEBSERVER_TYPE
- ).apply(new EvaluationPayload(indexedDoc, DST_IP))
- );
+ ).apply(new EvaluationPayload(indexedDoc, DST_IP));
+ Assert.assertTrue(isEnriched);
enriched = true;
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
new file mode 100644
index 0000000..1f06733
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.integration;
+
+public class UnifiedEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
+ @Override
+ public String fluxPath() {
+ return "../metron-enrichment/src/main/flux/enrichment/remote-unified.yaml";
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
new file mode 100644
index 0000000..c3a3109
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.enrichment.adapters.stellar.StellarAdapter;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ParallelEnricherTest {
+ /**
+ * {
+ "enrichment": {
+ "fieldMap": {
+ "stellar" : {
+ "config" : {
+ "numeric" : {
+ "map" : "{ 'blah' : 1}"
+ ,"one" : "MAP_GET('blah', map)"
+ ,"foo": "1 + 1"
+ }
+ ,"ALL_CAPS" : "TO_UPPER(source.type)"
+ }
+ }
+ }
+ ,"fieldToTypeMap": { }
+ },
+ "threatIntel": { }
+}
+ */
+ @Multiline
+ public static String goodConfig;
+
+ private static ParallelEnricher enricher;
+ private static Context stellarContext;
+ private static AtomicInteger numAccesses = new AtomicInteger(0);
+ @BeforeClass
+ public static void setup() {
+ ConcurrencyContext infrastructure = new ConcurrencyContext();
+ infrastructure.initialize(5, 100, 10, null, null, false);
+ stellarContext = new Context.Builder()
+ .build();
+ StellarFunctions.initialize(stellarContext);
+ StellarAdapter adapter = new StellarAdapter(){
+ @Override
+ public void logAccess(CacheKey value) {
+ numAccesses.incrementAndGet();
+ }
+ }.ofType("ENRICHMENT");
+ adapter.initializeAdapter(new HashMap<>());
+ enricher = new ParallelEnricher(ImmutableMap.of("stellar", adapter), infrastructure, false);
+ }
+
+ @Test
+ public void testCacheHit() throws Exception {
+ numAccesses.set(0);
+ JSONObject message = new JSONObject() {{
+ put(Constants.SENSOR_TYPE, "test");
+ }};
+ for(int i = 0;i < 10;++i) {
+ SensorEnrichmentConfig config = JSONUtils.INSTANCE.load(goodConfig, SensorEnrichmentConfig.class);
+ config.getConfiguration().putIfAbsent("stellarContext", stellarContext);
+ ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null);
+ }
+ //we only want 2 actual instances of the adapter.enrich being run due to the cache.
+ Assert.assertTrue(2 >= numAccesses.get());
+ }
+
+ @Test
+ public void testGoodConfig() throws Exception {
+ SensorEnrichmentConfig config = JSONUtils.INSTANCE.load(goodConfig, SensorEnrichmentConfig.class);
+ config.getConfiguration().putIfAbsent("stellarContext", stellarContext);
+ JSONObject message = new JSONObject() {{
+ put(Constants.SENSOR_TYPE, "test");
+ }};
+ ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null);
+ JSONObject ret = result.getResult();
+ Assert.assertEquals("Got the wrong result count: " + ret, 8, ret.size());
+ Assert.assertEquals(1, ret.get("map.blah"));
+ Assert.assertEquals("test", ret.get("source.type"));
+ Assert.assertEquals(1, ret.get("one"));
+ Assert.assertEquals(2, ret.get("foo"));
+ Assert.assertEquals("TEST", ret.get("ALL_CAPS"));
+ Assert.assertEquals(0, result.getEnrichmentErrors().size());
+ }
+
+ /**
+ * {
+ "enrichment": {
+ "fieldMap": {
+ "stellar" : {
+ "config" : {
+ "numeric" : [
+ "map := { 'blah' : 1}"
+ ,"one := MAP_GET('blah', map)"
+ ,"foo := 1 + 1"
+ ]
+ ,"ALL_CAPS" : "TO_UPPER(source.type)"
+ ,"errors" : [
+ "error := 1/0"
+ ]
+ }
+ }
+ }
+ ,"fieldToTypeMap": { }
+ },
+ "threatIntel": { }
+}
+ */
+ @Multiline
+ public static String badConfig;
+
+ @Test
+ public void testBadConfig() throws Exception {
+ SensorEnrichmentConfig config = JSONUtils.INSTANCE.load(badConfig, SensorEnrichmentConfig.class);
+ config.getConfiguration().putIfAbsent("stellarContext", stellarContext);
+ JSONObject message = new JSONObject() {{
+ put(Constants.SENSOR_TYPE, "test");
+ }};
+ ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null);
+ JSONObject ret = result.getResult();
+ Assert.assertEquals(ret + " is not what I expected", 8, ret.size());
+ Assert.assertEquals(1, ret.get("map.blah"));
+ Assert.assertEquals("test", ret.get("source.type"));
+ Assert.assertEquals(1, ret.get("one"));
+ Assert.assertEquals(2, ret.get("foo"));
+ Assert.assertEquals("TEST", ret.get("ALL_CAPS"));
+ Assert.assertEquals(1, result.getEnrichmentErrors().size());
+ }
+}