You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 14:59:33 UTC

[03/52] [abbrv] metron git commit: METRON-1460: Create a complementary non-split-join enrichment topology closes apache/metron#940

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java
new file mode 100644
index 0000000..5f82b1c
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+
+/**
+ * The strategy to use to construct the thread pool.
+ */
+public enum WorkerPoolStrategies {
+  /**
+   * Fixed thread pool
+   */
+  FIXED(numThreads -> Executors.newFixedThreadPool(numThreads)),
+  /**
+   * Work stealing thread pool.
+   */
+  WORK_STEALING(numThreads -> Executors.newWorkStealingPool(numThreads))
+  ;
+  Function<Integer, ExecutorService> creator;
+  WorkerPoolStrategies(Function<Integer, ExecutorService> creator) {
+    this.creator = creator;
+  }
+
+  public ExecutorService create(int numThreads) {
+    return creator.apply(numThreads);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
index ab3d462..63d39c5 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
@@ -21,6 +21,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
@@ -28,6 +29,7 @@ import org.apache.metron.enrichment.lookup.EnrichmentLookup;
 import org.apache.metron.enrichment.lookup.handler.KeyWithContext;
 import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.json.simple.JSONObject;
 import sun.management.Sensor;
 
 import javax.annotation.Nullable;
@@ -118,4 +120,18 @@ public class EnrichmentUtils {
     }
   }
 
+  public static JSONObject adjustKeys(JSONObject enrichedMessage, JSONObject enrichedField, String field, String prefix) {
+    if ( !enrichedField.isEmpty()) {
+      for (Object enrichedKey : enrichedField.keySet()) {
+        if(!StringUtils.isEmpty(prefix)) {
+          enrichedMessage.put(field + "." + enrichedKey, enrichedField.get(enrichedKey));
+        }
+        else {
+          enrichedMessage.put(enrichedKey, enrichedField.get(enrichedKey));
+        }
+      }
+    }
+    return enrichedMessage;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java
index 7898ccd..870d709 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java
@@ -18,15 +18,142 @@
 package org.apache.metron.enrichment.utils;
 
 import com.google.common.base.Joiner;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.enrichment.threatintel.RuleScore;
+import org.apache.metron.common.configuration.enrichment.threatintel.ThreatScore;
+import org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
+import org.apache.metron.threatintel.triage.ThreatTriageProcessor;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
 
 public class ThreatIntelUtils {
+  public static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public static final String KEY_PREFIX = "threatintels";
+ /**
+   * The message key under which the overall threat triage score is stored.
+   */
+  public static final String THREAT_TRIAGE_SCORE_KEY = "threat.triage.score";
+
+  /**
+   * The prefix of the message keys that record the threat triage rules that fired.
+   */
+  public static final String THREAT_TRIAGE_RULES_KEY = "threat.triage.rules";
+
+  /**
+   * The portion of the message key used to record the 'name' field of a rule.
+   */
+  public static final String THREAT_TRIAGE_RULE_NAME = "name";
+
+  /**
+   * The portion of the message key used to record the 'comment' field of a rule.
+   */
+  public static final String THREAT_TRIAGE_RULE_COMMENT = "comment";
+
+  /**
+   * The portion of the message key used to record the 'score' field of a rule.
+   */
+  public static final String THREAT_TRIAGE_RULE_SCORE = "score";
+
+  /**
+   * The portion of the message key used to record the 'reason' field of a rule.
+   */
+  public static final String THREAT_TRIAGE_RULE_REASON = "reason";
+
 
   public static String getThreatIntelKey(String threatIntelName, String field) {
     return Joiner.on(".").join(new String[]{KEY_PREFIX, threatIntelName, field});
   }
 
+public static JSONObject triage(JSONObject ret, SensorEnrichmentConfig config, FunctionResolver functionResolver, Context stellarContext) {
+    LOG.trace("Received joined messages: {}", ret);
+    boolean isAlert = ret.containsKey("is_alert");
+    if(!isAlert) {
+      for (Object key : ret.keySet()) {
+        if (key.toString().startsWith("threatintels") && !key.toString().endsWith(".ts")) {
+          isAlert = true;
+          break;
+        }
+      }
+    }
+    else {
+      Object isAlertObj = ret.get("is_alert");
+      isAlert = ConversionUtils.convert(isAlertObj, Boolean.class);
+      if(!isAlert) {
+        ret.remove("is_alert");
+      }
+    }
+    if(isAlert) {
+      ret.put("is_alert" , "true");
+      String sourceType = MessageUtils.getSensorType(ret);
+      ThreatTriageConfig triageConfig = null;
+      if(config != null) {
+        triageConfig = config.getThreatIntel().getTriageConfig();
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("{}: Found sensor enrichment config.", sourceType);
+        }
+      }
+      else {
+        LOG.debug("{}: Unable to find threat config.", sourceType );
+      }
+      if(triageConfig != null) {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("{}: Found threat triage config: {}", sourceType, triageConfig);
+        }
+
+        if(LOG.isDebugEnabled() && (triageConfig.getRiskLevelRules() == null || triageConfig.getRiskLevelRules().isEmpty())) {
+          LOG.debug("{}: Empty rules!", sourceType);
+        }
+
+        // triage the threat
+        ThreatTriageProcessor threatTriageProcessor = new ThreatTriageProcessor(config, functionResolver, stellarContext);
+        ThreatScore score = threatTriageProcessor.apply(ret);
+
+        if(LOG.isDebugEnabled()) {
+          String rules = Joiner.on('\n').join(triageConfig.getRiskLevelRules());
+          LOG.debug("Marked {} as triage level {} with rules {}", sourceType, score.getScore(),
+              rules);
+        }
+
+        // attach the triage threat score to the message
+        if(score.getRuleScores().size() > 0) {
+          appendThreatScore(score, ret);
+        }
+      }
+      else {
+        LOG.debug("{}: Unable to find threat triage config!", sourceType);
+      }
+    }
+
+    return ret;
+  }
+
 
+  /**
+   * Appends the threat score to the telemetry message.
+   * @param threatScore The threat triage score
+   * @param message The telemetry message being triaged.
+   */
+  private static void appendThreatScore(ThreatScore threatScore, JSONObject message) {
 
+    // append the overall threat score
+    message.put(THREAT_TRIAGE_SCORE_KEY, threatScore.getScore());
+
+    // append each of the rules - each rule is 'flat'
+    Joiner joiner = Joiner.on(".");
+    int i = 0;
+    for(RuleScore score: threatScore.getRuleScores()) {
+      message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_NAME), score.getRule().getName());
+      message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_COMMENT), score.getRule().getComment());
+      message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_SCORE), score.getRule().getScore());
+      message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i++, THREAT_TRIAGE_RULE_REASON), score.getReason());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index 828f4e3..267ca62 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -17,13 +17,6 @@
  */
 package org.apache.metron.enrichment.integration;
 
-import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULES_KEY;
-import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_COMMENT;
-import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_NAME;
-import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_REASON;
-import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_SCORE;
-import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_SCORE_KEY;
-
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Predicate;
@@ -54,6 +47,7 @@ import org.apache.metron.enrichment.integration.components.ConfigUploadComponent
 import org.apache.metron.enrichment.lookup.LookupKV;
 import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator;
 import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions;
+import org.apache.metron.enrichment.utils.ThreatIntelUtils;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.hbase.mock.MockHTable;
 import org.apache.metron.integration.BaseIntegrationTest;
@@ -89,13 +83,15 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
   public static final String DEFAULT_DMACODE= "test dmaCode";
   public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE);
 
-  protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/remote.yaml";
   protected String templatePath = "../metron-enrichment/src/main/config/enrichment.properties.j2";
   protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed";
   private final List<byte[]> inputMessages = getInputMessages(sampleParsedPath);
 
   private static File geoHdfsFile;
 
+  protected String fluxPath() {
+    return "../metron-enrichment/src/main/flux/enrichment/remote.yaml";
+  }
 
   private static List<byte[]> getInputMessages(String path){
     try{
@@ -190,7 +186,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
     }});
 
     FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
-            .withTopologyLocation(new File(fluxPath))
+            .withTopologyLocation(new File(fluxPath()))
             .withTopologyName("test")
             .withTemplateLocation(new File(templatePath))
             .withTopologyProperties(topologyProperties)
@@ -247,8 +243,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
 
   protected void validateErrors(List<Map<String, Object>> errors) {
     for(Map<String, Object> error : errors) {
-      Assert.assertEquals("java.lang.ArithmeticException: / by zero", error.get(Constants.ErrorFields.MESSAGE.getName()));
-      Assert.assertEquals("com.google.common.util.concurrent.UncheckedExecutionException: java.lang.ArithmeticException: / by zero", error.get(Constants.ErrorFields.EXCEPTION.getName()));
+      Assert.assertTrue(error.get(Constants.ErrorFields.MESSAGE.getName()).toString(), error.get(Constants.ErrorFields.MESSAGE.getName()).toString().contains("/ by zero") );
+      Assert.assertTrue(error.get(Constants.ErrorFields.EXCEPTION.getName()).toString().contains("/ by zero"));
       Assert.assertEquals(Constants.ErrorType.ENRICHMENT_ERROR.getType(), error.get(Constants.ErrorFields.ERROR_TYPE.getName()));
       Assert.assertEquals("{\"error_test\":{},\"source.type\":\"test\"}", error.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
     }
@@ -399,17 +395,17 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
       Assert.assertEquals(indexedDoc.getOrDefault("is_alert",""), "true");
 
       // validate threat triage score
-      Assert.assertTrue(indexedDoc.containsKey(THREAT_TRIAGE_SCORE_KEY));
-      Double score = (Double) indexedDoc.get(THREAT_TRIAGE_SCORE_KEY);
+      Assert.assertTrue(indexedDoc.containsKey(ThreatIntelUtils.THREAT_TRIAGE_SCORE_KEY));
+      Double score = (Double) indexedDoc.get(ThreatIntelUtils.THREAT_TRIAGE_SCORE_KEY);
       Assert.assertEquals(score, 10d, 1e-7);
 
       // validate threat triage rules
       Joiner joiner = Joiner.on(".");
       Stream.of(
-              joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_NAME),
-              joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_COMMENT),
-              joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_REASON),
-              joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_SCORE))
+              joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_NAME),
+              joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_COMMENT),
+              joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_REASON),
+              joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_SCORE))
               .forEach(key ->
                       Assert.assertTrue(String.format("Missing expected key: '%s'", key), indexedDoc.containsKey(key)));
     }
@@ -471,11 +467,11 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
         enriched = true;
       }
       if (ips.contains(indexedDoc.get(DST_IP))) {
-        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+        boolean isEnriched = Predicates.and(HostEnrichments.LOCAL_LOCATION
                 ,HostEnrichments.IMPORTANT
                 ,HostEnrichments.PRINTER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, DST_IP))
-        );
+                ).apply(new EvaluationPayload(indexedDoc, DST_IP));
+        Assert.assertTrue(isEnriched);
         enriched = true;
       }
     }
@@ -492,11 +488,11 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
         enriched = true;
       }
       if (ips.contains(indexedDoc.get(DST_IP))) {
-        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
+        boolean isEnriched = Predicates.and(HostEnrichments.LOCAL_LOCATION
                 ,HostEnrichments.IMPORTANT
                 ,HostEnrichments.WEBSERVER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, DST_IP))
-        );
+                ).apply(new EvaluationPayload(indexedDoc, DST_IP));
+        Assert.assertTrue(isEnriched);
         enriched = true;
       }
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
new file mode 100644
index 0000000..1f06733
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.integration;
+
+public class UnifiedEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
+  @Override
+  public String fluxPath() {
+    return "../metron-enrichment/src/main/flux/enrichment/remote-unified.yaml";
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
new file mode 100644
index 0000000..c3a3109
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.enrichment.adapters.stellar.StellarAdapter;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ParallelEnricherTest {
+  /**
+   * {
+  "enrichment": {
+    "fieldMap": {
+      "stellar" : {
+        "config" : {
+          "numeric" : {
+                      "map" : "{ 'blah' : 1}"
+                      ,"one" : "MAP_GET('blah', map)"
+                      ,"foo": "1 + 1"
+                      }
+          ,"ALL_CAPS" : "TO_UPPER(source.type)"
+        }
+      }
+    }
+  ,"fieldToTypeMap": { }
+  },
+  "threatIntel": { }
+}
+   */
+  @Multiline
+  public static String goodConfig;
+
+  private static ParallelEnricher enricher;
+  private static Context stellarContext;
+  private static AtomicInteger numAccesses = new AtomicInteger(0);
+  @BeforeClass
+  public static void setup() {
+    ConcurrencyContext infrastructure = new ConcurrencyContext();
+    infrastructure.initialize(5, 100, 10, null, null, false);
+    stellarContext = new Context.Builder()
+                         .build();
+    StellarFunctions.initialize(stellarContext);
+    StellarAdapter adapter = new StellarAdapter(){
+      @Override
+      public void logAccess(CacheKey value) {
+        numAccesses.incrementAndGet();
+      }
+    }.ofType("ENRICHMENT");
+    adapter.initializeAdapter(new HashMap<>());
+    enricher = new ParallelEnricher(ImmutableMap.of("stellar", adapter), infrastructure, false);
+  }
+
+  @Test
+  public void testCacheHit() throws Exception {
+    numAccesses.set(0);
+    JSONObject message = new JSONObject() {{
+      put(Constants.SENSOR_TYPE, "test");
+    }};
+    for(int i = 0;i < 10;++i) {
+      SensorEnrichmentConfig config = JSONUtils.INSTANCE.load(goodConfig, SensorEnrichmentConfig.class);
+      config.getConfiguration().putIfAbsent("stellarContext", stellarContext);
+      ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null);
+    }
+    //we only want 2 actual instances of the adapter.enrich being run due to the cache.
+    Assert.assertTrue(2 >= numAccesses.get());
+  }
+
+  @Test
+  public void testGoodConfig() throws Exception {
+    SensorEnrichmentConfig config = JSONUtils.INSTANCE.load(goodConfig, SensorEnrichmentConfig.class);
+    config.getConfiguration().putIfAbsent("stellarContext", stellarContext);
+    JSONObject message = new JSONObject() {{
+      put(Constants.SENSOR_TYPE, "test");
+    }};
+    ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null);
+    JSONObject ret = result.getResult();
+    Assert.assertEquals("Got the wrong result count: " + ret, 8, ret.size());
+    Assert.assertEquals(1, ret.get("map.blah"));
+    Assert.assertEquals("test", ret.get("source.type"));
+    Assert.assertEquals(1, ret.get("one"));
+    Assert.assertEquals(2, ret.get("foo"));
+    Assert.assertEquals("TEST", ret.get("ALL_CAPS"));
+    Assert.assertEquals(0, result.getEnrichmentErrors().size());
+  }
+
+  /**
+   * {
+  "enrichment": {
+    "fieldMap": {
+      "stellar" : {
+        "config" : {
+          "numeric" : [
+                      "map := { 'blah' : 1}"
+                      ,"one := MAP_GET('blah', map)"
+                      ,"foo := 1 + 1"
+                      ]
+          ,"ALL_CAPS" : "TO_UPPER(source.type)"
+          ,"errors" : [
+            "error := 1/0"
+          ]
+        }
+      }
+    }
+  ,"fieldToTypeMap": { }
+  },
+  "threatIntel": { }
+}
+   */
+  @Multiline
+  public static String badConfig;
+
+  @Test
+  public void testBadConfig() throws Exception {
+    SensorEnrichmentConfig config = JSONUtils.INSTANCE.load(badConfig, SensorEnrichmentConfig.class);
+    config.getConfiguration().putIfAbsent("stellarContext", stellarContext);
+    JSONObject message = new JSONObject() {{
+      put(Constants.SENSOR_TYPE, "test");
+    }};
+    ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null);
+    JSONObject ret = result.getResult();
+    Assert.assertEquals(ret + " is not what I expected", 8, ret.size());
+    Assert.assertEquals(1, ret.get("map.blah"));
+    Assert.assertEquals("test", ret.get("source.type"));
+    Assert.assertEquals(1, ret.get("one"));
+    Assert.assertEquals(2, ret.get("foo"));
+    Assert.assertEquals("TEST", ret.get("ALL_CAPS"));
+    Assert.assertEquals(1, result.getEnrichmentErrors().size());
+  }
+}