You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/12/04 14:27:38 UTC

metron git commit: METRON-1889: Add any missing timestamp fields to unified enrichment topology (mmiklavc via mmiklavc) closes apache/metron#1286

Repository: metron
Updated Branches:
  refs/heads/master b4d76f98e -> 4ef65e09e


METRON-1889: Add any missing timestamp fields to unified enrichment topology (mmiklavc via mmiklavc) closes apache/metron#1286


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/4ef65e09
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/4ef65e09
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/4ef65e09

Branch: refs/heads/master
Commit: 4ef65e09ea4a1eac8abf89521e5a999faeca1f37
Parents: b4d76f9
Author: mmiklavc <mi...@gmail.com>
Authored: Tue Dec 4 07:27:33 2018 -0700
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Tue Dec 4 07:27:33 2018 -0700

----------------------------------------------------------------------
 .../enrichment/parallel/ParallelEnricher.java   |  10 +-
 .../enrichment/utils/EnrichmentUtils.java       |  13 +--
 .../parallel/ParallelEnricherTest.java          | 104 ++++++++++++-------
 3 files changed, 77 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/4ef65e09/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
index b10c148..1de8945 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
@@ -157,6 +157,7 @@ public class ParallelEnricher {
         throw new IllegalStateException("Unable to find an adapter for " + task.getKey()
                 + ", possible adapters are: " + Joiner.on(",").join(enrichmentsByType.keySet()));
       }
+      message.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".begin.ts", "" + System.currentTimeMillis());
       for(JSONObject m : task.getValue()) {
         /* now for each unit of work (each of these only has one element in them)
          * the key is the field name and the value is value associated with that field.
@@ -171,6 +172,7 @@ public class ParallelEnricher {
           String field = (String) o;
           Object value = m.get(o);
           if(value == null) {
+            message.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
             continue;
           }
           CacheKey cacheKey = new CacheKey(field, value, config);
@@ -182,7 +184,10 @@ public class ParallelEnricher {
                 ret = new JSONObject();
               }
               //each enrichment has their own unique prefix to use to adjust the keys for the enriched fields.
-              return EnrichmentUtils.adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix);
+              JSONObject adjustedKeys = EnrichmentUtils
+                  .adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix);
+              adjustedKeys.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
+              return adjustedKeys;
             } catch (Throwable e) {
               JSONObject errorMessage = new JSONObject();
               errorMessage.putAll(m);
@@ -197,11 +202,12 @@ public class ParallelEnricher {
       }
     }
     if(taskList.isEmpty()) {
+      message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis());
       return new EnrichmentResult(message, errors);
     }
 
     EnrichmentResult ret = new EnrichmentResult(all(taskList, message, (left, right) -> join(left, right)).get(), errors);
-    message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis());
+    ret.getResult().put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis());
     if(perfLog != null) {
       String key = message.get(Constants.GUID) + "";
       perfLog.log("enrich", "key={}, elapsed time to enrich", key);

http://git-wip-us.apache.org/repos/asf/metron/blob/4ef65e09/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
index 63d39c5..9a36a87 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
@@ -21,21 +21,18 @@ import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.lookup.EnrichmentLookup;
 import org.apache.metron.enrichment.lookup.handler.KeyWithContext;
 import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.json.simple.JSONObject;
-import sun.management.Sensor;
-
-import javax.annotation.Nullable;
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashMap;
-import java.util.Map;
 
 public class EnrichmentUtils {
 

http://git-wip-us.apache.org/repos/asf/metron/blob/4ef65e09/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
index d4fcdf4..a6832d6 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
@@ -65,51 +65,57 @@ public class ParallelEnricherTest {
   private static Context stellarContext;
   private static AtomicInteger numAccesses = new AtomicInteger(0);
   private static Map<String, EnrichmentAdapter<CacheKey>> enrichmentsByType;
-  @BeforeClass
-  public static void setup() {
-    ConcurrencyContext infrastructure = new ConcurrencyContext();
-    infrastructure.initialize(5, 100, 10, null, null, false);
-    stellarContext = new Context.Builder()
-                         .build();
-    StellarFunctions.initialize(stellarContext);
-    StellarAdapter adapter = new StellarAdapter(){
-      @Override
-      public void logAccess(CacheKey value) {
-        numAccesses.incrementAndGet();
-      }
-    }.ofType("ENRICHMENT");
-    adapter.initializeAdapter(new HashMap<>());
-    EnrichmentAdapter<CacheKey> dummy = new EnrichmentAdapter<CacheKey>() {
-      @Override
-      public void logAccess(CacheKey value) {
+  // Declaring explicit class bc getClass().getSimpleName() returns "" for anon classes
+  public static class DummyEnrichmentAdapter implements EnrichmentAdapter<CacheKey> {
+    @Override
+    public void logAccess(CacheKey value) {
 
-      }
+    }
 
-      @Override
-      public JSONObject enrich(CacheKey value) {
-        return null;
-      }
+    @Override
+    public JSONObject enrich(CacheKey value) {
+      return null;
+    }
 
-      @Override
-      public boolean initializeAdapter(Map<String, Object> config) {
-        return false;
-      }
+    @Override
+    public boolean initializeAdapter(Map<String, Object> config) {
+      return false;
+    }
 
-      @Override
-      public void updateAdapter(Map<String, Object> config) {
+    @Override
+    public void updateAdapter(Map<String, Object> config) {
 
-      }
+    }
 
-      @Override
-      public void cleanup() {
+    @Override
+    public void cleanup() {
 
-      }
+    }
 
-      @Override
-      public String getOutputPrefix(CacheKey value) {
-        return null;
-      }
-    };
+    @Override
+    public String getOutputPrefix(CacheKey value) {
+      return null;
+    }
+  }
+
+  // Declaring explicit class bc getClass().getSimpleName() returns "" for anon classes
+  public static class AccessLoggingStellarAdapter extends StellarAdapter {
+    @Override
+    public void logAccess(CacheKey value) {
+      numAccesses.incrementAndGet();
+    }
+  }
+
+  @BeforeClass
+  public static void setup() {
+    ConcurrencyContext infrastructure = new ConcurrencyContext();
+    infrastructure.initialize(5, 100, 10, null, null, false);
+    stellarContext = new Context.Builder()
+                         .build();
+    StellarFunctions.initialize(stellarContext);
+    StellarAdapter adapter = new AccessLoggingStellarAdapter().ofType("ENRICHMENT");
+    adapter.initializeAdapter(new HashMap<>());
+    EnrichmentAdapter<CacheKey> dummy = new DummyEnrichmentAdapter();
 
     enrichmentsByType = ImmutableMap.of("stellar", adapter, "dummy", dummy);
     enricher = new ParallelEnricher(enrichmentsByType, infrastructure, false);
@@ -139,13 +145,19 @@ public class ParallelEnricherTest {
     }};
     ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null);
     JSONObject ret = result.getResult();
-    Assert.assertEquals("Got the wrong result count: " + ret, 8, ret.size());
+    Assert.assertEquals("Got the wrong result count: " + ret, 11, ret.size());
     Assert.assertEquals(1, ret.get("map.blah"));
     Assert.assertEquals("test", ret.get("source.type"));
     Assert.assertEquals(1, ret.get("one"));
     Assert.assertEquals(2, ret.get("foo"));
     Assert.assertEquals("TEST", ret.get("ALL_CAPS"));
     Assert.assertEquals(0, result.getEnrichmentErrors().size());
+    Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.begin.ts"));
+    Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.end.ts"));
+    Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts"));
+    Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts"));
+    Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts"));
+    Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts"));
   }
 /**
    * {
@@ -170,7 +182,13 @@ public class ParallelEnricherTest {
     }};
     ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null);
     JSONObject ret = result.getResult();
-    Assert.assertEquals("Got the wrong result count: " + ret, 4, ret.size());
+    Assert.assertEquals("Got the wrong result count: " + ret, 7, ret.size());
+    Assert.assertTrue(result.getResult().containsKey("adapter.dummyenrichmentadapter.begin.ts"));
+    Assert.assertTrue(result.getResult().containsKey("adapter.dummyenrichmentadapter.end.ts"));
+    Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts"));
+    Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts"));
+    Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts"));
+    Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts"));
   }
 
   /**
@@ -208,13 +226,19 @@ public class ParallelEnricherTest {
     }};
     ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null);
     JSONObject ret = result.getResult();
-    Assert.assertEquals(ret + " is not what I expected", 8, ret.size());
+    Assert.assertEquals(ret + " is not what I expected", 11, ret.size());
     Assert.assertEquals(1, ret.get("map.blah"));
     Assert.assertEquals("test", ret.get("source.type"));
     Assert.assertEquals(1, ret.get("one"));
     Assert.assertEquals(2, ret.get("foo"));
     Assert.assertEquals("TEST", ret.get("ALL_CAPS"));
     Assert.assertEquals(1, result.getEnrichmentErrors().size());
+    Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.begin.ts"));
+    Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.end.ts"));
+    Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts"));
+    Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts"));
+    Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts"));
+    Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts"));
   }
 
   /**