You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/12/04 14:27:38 UTC
metron git commit: METRON-1889: Add any missing timestamp fields to
unified enrichment topology (mmiklavc via mmiklavc) closes apache/metron#1286
Repository: metron
Updated Branches:
refs/heads/master b4d76f98e -> 4ef65e09e
METRON-1889: Add any missing timestamp fields to unified enrichment topology (mmiklavc via mmiklavc) closes apache/metron#1286
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/4ef65e09
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/4ef65e09
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/4ef65e09
Branch: refs/heads/master
Commit: 4ef65e09ea4a1eac8abf89521e5a999faeca1f37
Parents: b4d76f9
Author: mmiklavc <mi...@gmail.com>
Authored: Tue Dec 4 07:27:33 2018 -0700
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Tue Dec 4 07:27:33 2018 -0700
----------------------------------------------------------------------
.../enrichment/parallel/ParallelEnricher.java | 10 +-
.../enrichment/utils/EnrichmentUtils.java | 13 +--
.../parallel/ParallelEnricherTest.java | 104 ++++++++++++-------
3 files changed, 77 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/4ef65e09/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
index b10c148..1de8945 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
@@ -157,6 +157,7 @@ public class ParallelEnricher {
throw new IllegalStateException("Unable to find an adapter for " + task.getKey()
+ ", possible adapters are: " + Joiner.on(",").join(enrichmentsByType.keySet()));
}
+ message.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".begin.ts", "" + System.currentTimeMillis());
for(JSONObject m : task.getValue()) {
/* now for each unit of work (each of these only has one element in them)
* the key is the field name and the value is value associated with that field.
@@ -171,6 +172,7 @@ public class ParallelEnricher {
String field = (String) o;
Object value = m.get(o);
if(value == null) {
+ message.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
continue;
}
CacheKey cacheKey = new CacheKey(field, value, config);
@@ -182,7 +184,10 @@ public class ParallelEnricher {
ret = new JSONObject();
}
//each enrichment has their own unique prefix to use to adjust the keys for the enriched fields.
- return EnrichmentUtils.adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix);
+ JSONObject adjustedKeys = EnrichmentUtils
+ .adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix);
+ adjustedKeys.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
+ return adjustedKeys;
} catch (Throwable e) {
JSONObject errorMessage = new JSONObject();
errorMessage.putAll(m);
@@ -197,11 +202,12 @@ public class ParallelEnricher {
}
}
if(taskList.isEmpty()) {
+ message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis());
return new EnrichmentResult(message, errors);
}
EnrichmentResult ret = new EnrichmentResult(all(taskList, message, (left, right) -> join(left, right)).get(), errors);
- message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis());
+ ret.getResult().put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis());
if(perfLog != null) {
String key = message.get(Constants.GUID) + "";
perfLog.log("enrich", "key={}, elapsed time to enrich", key);
http://git-wip-us.apache.org/repos/asf/metron/blob/4ef65e09/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
index 63d39c5..9a36a87 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
@@ -21,21 +21,18 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
import org.apache.metron.enrichment.lookup.EnrichmentLookup;
import org.apache.metron.enrichment.lookup.handler.KeyWithContext;
import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.enrichment.converter.EnrichmentKey;
import org.json.simple.JSONObject;
-import sun.management.Sensor;
-
-import javax.annotation.Nullable;
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashMap;
-import java.util.Map;
public class EnrichmentUtils {
http://git-wip-us.apache.org/repos/asf/metron/blob/4ef65e09/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
index d4fcdf4..a6832d6 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java
@@ -65,51 +65,57 @@ public class ParallelEnricherTest {
private static Context stellarContext;
private static AtomicInteger numAccesses = new AtomicInteger(0);
private static Map<String, EnrichmentAdapter<CacheKey>> enrichmentsByType;
- @BeforeClass
- public static void setup() {
- ConcurrencyContext infrastructure = new ConcurrencyContext();
- infrastructure.initialize(5, 100, 10, null, null, false);
- stellarContext = new Context.Builder()
- .build();
- StellarFunctions.initialize(stellarContext);
- StellarAdapter adapter = new StellarAdapter(){
- @Override
- public void logAccess(CacheKey value) {
- numAccesses.incrementAndGet();
- }
- }.ofType("ENRICHMENT");
- adapter.initializeAdapter(new HashMap<>());
- EnrichmentAdapter<CacheKey> dummy = new EnrichmentAdapter<CacheKey>() {
- @Override
- public void logAccess(CacheKey value) {
+ // Declaring explicit class bc getClass().getSimpleName() returns "" for anon classes
+ public static class DummyEnrichmentAdapter implements EnrichmentAdapter<CacheKey> {
+ @Override
+ public void logAccess(CacheKey value) {
- }
+ }
- @Override
- public JSONObject enrich(CacheKey value) {
- return null;
- }
+ @Override
+ public JSONObject enrich(CacheKey value) {
+ return null;
+ }
- @Override
- public boolean initializeAdapter(Map<String, Object> config) {
- return false;
- }
+ @Override
+ public boolean initializeAdapter(Map<String, Object> config) {
+ return false;
+ }
- @Override
- public void updateAdapter(Map<String, Object> config) {
+ @Override
+ public void updateAdapter(Map<String, Object> config) {
- }
+ }
- @Override
- public void cleanup() {
+ @Override
+ public void cleanup() {
- }
+ }
- @Override
- public String getOutputPrefix(CacheKey value) {
- return null;
- }
- };
+ @Override
+ public String getOutputPrefix(CacheKey value) {
+ return null;
+ }
+ }
+
+ // Declaring explicit class bc getClass().getSimpleName() returns "" for anon classes
+ public static class AccessLoggingStellarAdapter extends StellarAdapter {
+ @Override
+ public void logAccess(CacheKey value) {
+ numAccesses.incrementAndGet();
+ }
+ }
+
+ @BeforeClass
+ public static void setup() {
+ ConcurrencyContext infrastructure = new ConcurrencyContext();
+ infrastructure.initialize(5, 100, 10, null, null, false);
+ stellarContext = new Context.Builder()
+ .build();
+ StellarFunctions.initialize(stellarContext);
+ StellarAdapter adapter = new AccessLoggingStellarAdapter().ofType("ENRICHMENT");
+ adapter.initializeAdapter(new HashMap<>());
+ EnrichmentAdapter<CacheKey> dummy = new DummyEnrichmentAdapter();
enrichmentsByType = ImmutableMap.of("stellar", adapter, "dummy", dummy);
enricher = new ParallelEnricher(enrichmentsByType, infrastructure, false);
@@ -139,13 +145,19 @@ public class ParallelEnricherTest {
}};
ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null);
JSONObject ret = result.getResult();
- Assert.assertEquals("Got the wrong result count: " + ret, 8, ret.size());
+ Assert.assertEquals("Got the wrong result count: " + ret, 11, ret.size());
Assert.assertEquals(1, ret.get("map.blah"));
Assert.assertEquals("test", ret.get("source.type"));
Assert.assertEquals(1, ret.get("one"));
Assert.assertEquals(2, ret.get("foo"));
Assert.assertEquals("TEST", ret.get("ALL_CAPS"));
Assert.assertEquals(0, result.getEnrichmentErrors().size());
+ Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.begin.ts"));
+ Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.end.ts"));
+ Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts"));
+ Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts"));
+ Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts"));
+ Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts"));
}
/**
* {
@@ -170,7 +182,13 @@ public class ParallelEnricherTest {
}};
ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null);
JSONObject ret = result.getResult();
- Assert.assertEquals("Got the wrong result count: " + ret, 4, ret.size());
+ Assert.assertEquals("Got the wrong result count: " + ret, 7, ret.size());
+ Assert.assertTrue(result.getResult().containsKey("adapter.dummyenrichmentadapter.begin.ts"));
+ Assert.assertTrue(result.getResult().containsKey("adapter.dummyenrichmentadapter.end.ts"));
+ Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts"));
+ Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts"));
+ Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts"));
+ Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts"));
}
/**
@@ -208,13 +226,19 @@ public class ParallelEnricherTest {
}};
ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null);
JSONObject ret = result.getResult();
- Assert.assertEquals(ret + " is not what I expected", 8, ret.size());
+ Assert.assertEquals(ret + " is not what I expected", 11, ret.size());
Assert.assertEquals(1, ret.get("map.blah"));
Assert.assertEquals("test", ret.get("source.type"));
Assert.assertEquals(1, ret.get("one"));
Assert.assertEquals(2, ret.get("foo"));
Assert.assertEquals("TEST", ret.get("ALL_CAPS"));
Assert.assertEquals(1, result.getEnrichmentErrors().size());
+ Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.begin.ts"));
+ Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.end.ts"));
+ Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts"));
+ Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts"));
+ Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts"));
+ Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts"));
}
/**