You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/09/13 22:39:01 UTC
incubator-metron git commit: METRON-397: Add a stellar function to
interact with the HBase enrichment table closes apache/incubator-metron#234
Repository: incubator-metron
Updated Branches:
refs/heads/master ef3e9fa5e -> 6fb85e124
METRON-397: Add a stellar function to interact with the HBase enrichment table closes apache/incubator-metron#234
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/6fb85e12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/6fb85e12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/6fb85e12
Branch: refs/heads/master
Commit: 6fb85e124d87581c37b79ea105570f4f85b59384
Parents: ef3e9fa
Author: cstella <ce...@gmail.com>
Authored: Tue Sep 13 18:38:51 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue Sep 13 18:38:51 2016 -0400
----------------------------------------------------------------------
.../org/apache/metron/common/dsl/Context.java | 21 +-
.../common/dsl/functions/DateFunctions.java | 1 +
.../adapters/stellar/StellarAdapter.java | 10 +-
.../enrichment/bolt/GenericEnrichmentBolt.java | 1 +
.../accesstracker/AccessTrackerCreator.java | 27 ++
.../lookup/accesstracker/AccessTrackers.java | 39 +++
.../PersistentBloomTrackerCreator.java | 101 +++++++
.../stellar/SimpleHBaseEnrichmentFunctions.java | 275 +++++++++++++++++++
.../integration/EnrichmentIntegrationTest.java | 42 ++-
.../components/ConfigUploadComponent.java | 6 +-
.../SimpleHBaseEnrichmentFunctionsTest.java | 114 ++++++++
.../main/config/zookeeper/enrichments/test.json | 7 +
.../apache/metron/parsers/bolt/ParserBolt.java | 1 +
.../test/bolt/BaseEnrichmentBoltTest.java | 2 +
.../org/apache/metron/test/mock/MockHTable.java | 1 +
15 files changed, 630 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
index aa8c9a2..bd43713 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
@@ -27,12 +27,13 @@ public class Context implements Serializable {
public interface Capability {
Object get();
}
-
+
public enum Capabilities {
- HBASE_PROVIDER,
- ZOOKEEPER_CLIENT,
- SERVICE_DISCOVERER,
- GLOBAL_CONFIG
+ SENSOR_CONFIG
+ , HBASE_PROVIDER
+ , GLOBAL_CONFIG
+ , ZOOKEEPER_CLIENT
+ , SERVICE_DISCOVERER;
}
public static class Builder {
@@ -48,6 +49,14 @@ public class Context implements Serializable {
capabilityMap.put(s.toString(), capability);
return this;
}
+
+ public Builder withAll(Map<String, Object> externalConfig) {
+ for(Map.Entry<String, Object> entry : externalConfig.entrySet()) {
+
+ capabilityMap.put(entry.getKey(), () -> entry.getValue());
+ }
+ return this;
+ }
public Context build() {
return new Context(capabilityMap);
@@ -89,4 +98,4 @@ public class Context implements Serializable {
public void addCapability(Enum<?> s, Capability capability) {
this.capabilities.put(s.toString(), capability);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DateFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DateFunctions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DateFunctions.java
index e28afd3..d83e468 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DateFunctions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DateFunctions.java
@@ -109,6 +109,7 @@ public class DateFunctions {
return sdf.parse(date).getTime();
}
+
/**
* Stellar Function: TO_EPOCH_TIMESTAMP
*/
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
index 3ba7755..9fa7363 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
@@ -90,7 +90,15 @@ public class StellarAdapter implements EnrichmentAdapter<CacheKey>,Serializable
if(kv.getValue() instanceof String) {
String stellarStatement = (String) kv.getValue();
Object o = processor.parse(stellarStatement, resolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext);
- message.put(kv.getKey(), o);
+ if(o != null && o instanceof Map) {
+ for(Map.Entry<Object, Object> valueKv : ((Map<Object, Object>)o).entrySet()) {
+ String newKey = ((kv.getKey().length() > 0)?kv.getKey() + "." : "" )+ valueKv.getKey();
+ message.put(newKey, valueKv.getValue());
+ }
+ }
+ else {
+ message.put(kv.getKey(), o);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 6fad0f8..059817a 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -158,6 +158,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
protected void initializeStellar() {
stellarContext = new Context.Builder()
.with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
.build();
StellarFunctions.initialize(stellarContext);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerCreator.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerCreator.java
new file mode 100644
index 0000000..f4d0c4c
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerCreator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup.accesstracker;
+
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface AccessTrackerCreator {
+ public AccessTracker create(Map<String, Object> config, TableProvider provider) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackers.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackers.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackers.java
new file mode 100644
index 0000000..7dad6eb
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackers.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup.accesstracker;
+
+
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.IOException;
+import java.util.Map;
+
+public enum AccessTrackers implements AccessTrackerCreator {
+ NOOP((config, provider) -> new NoopAccessTracker())
+ ,PERSISTENT_BLOOM( new PersistentBloomTrackerCreator());
+ AccessTrackerCreator creator;
+ AccessTrackers(AccessTrackerCreator creator) {
+ this.creator = creator;
+ }
+
+
+ @Override
+ public AccessTracker create(Map<String, Object> config, TableProvider provider) throws IOException {
+ return creator.create(config, provider);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentBloomTrackerCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentBloomTrackerCreator.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentBloomTrackerCreator.java
new file mode 100644
index 0000000..02d0c7f
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentBloomTrackerCreator.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup.accesstracker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+public class PersistentBloomTrackerCreator implements AccessTrackerCreator {
+
+ public static class Config {
+ public static final String PERSISTENT_BLOOM_TABLE = "pbat.table";
+ public static final String PERSISTENT_BLOOM_CF = "pbat.cf";
+ public static final String PERSISTENT_BLOOM_FP = "pbat.false_positive";
+ public static final String PERSISTENT_BLOOM_EI = "pbat.expected_insertions";
+ public static final String PERSISTENT_BLOOM_MS_BETWEEN_PERSISTS = "pbat.ms_between_persists";
+ public static final long MS_IN_HOUR = 10000*60*60;
+ private String hBaseTable;
+ private String hBaseCF;
+ private double falsePositiveRate = 0.03;
+ private int expectedInsertions = 100000;
+ private long millisecondsBetweenPersists = 2*MS_IN_HOUR;
+ public Config(Map<String, Object> config) {
+ hBaseTable = (String) config.get(PERSISTENT_BLOOM_TABLE);
+ hBaseCF = (String) config.get(PERSISTENT_BLOOM_CF);
+ Object fpObj = config.get(PERSISTENT_BLOOM_FP);
+ if (fpObj != null) {
+ falsePositiveRate = ConversionUtils.convert(fpObj, Double.class);
+ }
+ Object eiObj = config.get(PERSISTENT_BLOOM_EI);
+ if (eiObj != null) {
+ expectedInsertions = ConversionUtils.convert(eiObj, Integer.class);
+ }
+ Object msObj = config.get(PERSISTENT_BLOOM_MS_BETWEEN_PERSISTS);
+ if(msObj != null) {
+ millisecondsBetweenPersists = ConversionUtils.convert(msObj, Long.class);
+ }
+ }
+
+ public String getHBaseTable() {
+ return hBaseTable;
+ }
+
+ public String getHBaseCF() {
+ return hBaseCF;
+ }
+
+ public double getFalsePositiveRate() {
+ return falsePositiveRate;
+ }
+
+ public int getExpectedInsertions() {
+ return expectedInsertions;
+ }
+
+
+ public long getMillisecondsBetweenPersists() {
+ return millisecondsBetweenPersists;
+ }
+ }
+
+ @Override
+ public AccessTracker create(Map<String, Object> config, TableProvider provider) throws IOException {
+ Config patConfig = new Config(config);
+ String hbaseTable = patConfig.getHBaseTable();
+ int expectedInsertions = patConfig.getExpectedInsertions();
+ double falsePositives = patConfig.getFalsePositiveRate();
+ long millisecondsBetweenPersist = patConfig.getMillisecondsBetweenPersists();
+ BloomAccessTracker bat = new BloomAccessTracker(hbaseTable, expectedInsertions, falsePositives);
+ Configuration hbaseConfig = HBaseConfiguration.create();
+
+ AccessTracker ret = new PersistentAccessTracker( hbaseTable
+ , UUID.randomUUID().toString()
+ , provider.getTable(hbaseConfig, hbaseTable)
+ , patConfig.getHBaseCF()
+ , bat
+ , millisecondsBetweenPersist
+ );
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctions.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctions.java
new file mode 100644
index 0000000..6d21d03
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctions.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.stellar;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.common.dsl.Stellar;
+import org.apache.metron.common.dsl.StellarFunction;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.EnrichmentLookup;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker;
+import org.apache.metron.enrichment.lookup.accesstracker.AccessTrackers;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+public class SimpleHBaseEnrichmentFunctions {
+ private static final Logger LOG = Logger.getLogger(SimpleHBaseEnrichmentFunctions.class);
+ public static final String ACCESS_TRACKER_TYPE_CONF = "accessTracker";
+ public static final String TABLE_PROVIDER_TYPE_CONF = "tableProviderImpl";
+ private static AccessTracker tracker;
+ private static TableProvider provider;
+
+
+ private static class Table {
+ String name;
+ String columnFamily;
+
+ public Table(String name, String columnFamily) {
+ this.name = name;
+ this.columnFamily = columnFamily;
+ }
+
+ @Override
+ public String toString() {
+ return "Table{" +
+ "name='" + name + '\'' +
+ ", columnFamily='" + columnFamily + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Table table = (Table) o;
+
+ if (name != null ? !name.equals(table.name) : table.name != null) return false;
+ return columnFamily != null ? columnFamily.equals(table.columnFamily) : table.columnFamily == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name != null ? name.hashCode() : 0;
+ result = 31 * result + (columnFamily != null ? columnFamily.hashCode() : 0);
+ return result;
+ }
+ }
+
+
+ private static Map<String, Object> getConfig(Context context) {
+ return (Map<String, Object>) context.getCapability(Context.Capabilities.GLOBAL_CONFIG).orElse(new HashMap<>());
+ }
+
+ private static synchronized void initializeTracker(Map<String, Object> config, TableProvider provider) throws IOException {
+ if(tracker == null) {
+ String accessTrackerType = (String) config.getOrDefault(ACCESS_TRACKER_TYPE_CONF, AccessTrackers.NOOP.toString());
+ AccessTrackers trackers = AccessTrackers.valueOf(accessTrackerType);
+ tracker = trackers.create(config, provider);
+ }
+ }
+
+ private static TableProvider createProvider(String tableProviderClass) {
+ try {
+ Class<? extends TableProvider> providerClazz = (Class<? extends TableProvider>) Class.forName(tableProviderClass);
+ return providerClazz.newInstance();
+ } catch (Exception e) {
+ return new HTableProvider();
+ }
+ }
+
+ private static synchronized void initializeProvider( Map<String, Object> config) {
+ if(provider != null) {
+ return ;
+ }
+ else {
+ String tableProviderClass = (String) config.getOrDefault(TABLE_PROVIDER_TYPE_CONF, HTableProvider.class.getName());
+ provider = createProvider(tableProviderClass);
+ }
+ }
+
+ @Stellar(name="EXISTS"
+ ,namespace="ENRICHMENT"
+ ,description="Interrogates the HBase table holding the simple hbase enrichment data and returns whether the" +
+ " enrichment type and indicator are in the table."
+ ,params = {
+ "enrichment_type - The enrichment type"
+ ,"indicator - The string indicator to look up"
+ ,"nosql_table - The NoSQL Table to use"
+ ,"column_family - The Column Family to use"
+ }
+ ,returns = "True if the enrichment indicator exists and false otherwise"
+ )
+ public static class EnrichmentExists implements StellarFunction {
+ boolean initialized = false;
+ private static Cache<Table, EnrichmentLookup> enrichmentCollateralCache = CacheBuilder.newBuilder()
+ .build();
+ @Override
+ public Object apply(List<Object> args, Context context) throws ParseException {
+ if(!initialized) {
+ return false;
+ }
+ if(args.size() < 2) {
+ throw new IllegalStateException("Requires at least an enrichment type and indicator");
+ }
+ int i = 0;
+ String enrichmentType = (String) args.get(i++);
+ String indicator = (String) args.get(i++);
+ String table = (String) args.get(i++);
+ String cf = (String) args.get(i++);
+ if(enrichmentType == null || indicator == null) {
+ return false;
+ }
+ final Table key = new Table(table, cf);
+ EnrichmentLookup lookup = null;
+ try {
+ lookup = enrichmentCollateralCache.get(key, () -> {
+ HTableInterface hTable = provider.getTable(HBaseConfiguration.create(), key.name);
+ return new EnrichmentLookup(hTable, key.columnFamily, tracker);
+ }
+ );
+ } catch (ExecutionException e) {
+ LOG.error("Unable to retrieve enrichmentLookup: " + e.getMessage(), e);
+ return false;
+ }
+ EnrichmentLookup.HBaseContext hbaseContext = new EnrichmentLookup.HBaseContext(lookup.getTable(), cf);
+ try {
+ return lookup.exists(new EnrichmentKey(enrichmentType, indicator), hbaseContext, true);
+ } catch (IOException e) {
+ LOG.error("Unable to call exists: " + e.getMessage(), e);
+ return false;
+ }
+ }
+
+ @Override
+ public void initialize(Context context) {
+ try {
+ Map<String, Object> config = getConfig(context);
+ initializeProvider(config);
+ initializeTracker(config, provider);
+ } catch (IOException e) {
+ LOG.error("Unable to initialize ENRICHMENT.EXISTS: " + e.getMessage(), e);
+ }
+ finally{
+ initialized = true;
+ }
+
+ }
+
+ @Override
+ public boolean isInitialized() {
+ return initialized;
+ }
+
+ }
+
+
+
+ @Stellar(name="GET"
+ ,namespace="ENRICHMENT"
+ ,description="Interrogates the HBase table holding the simple hbase enrichment data and retrieves the " +
+ "tabular value associated with the enrichment type and indicator."
+ ,params = {
+ "enrichment_type - The enrichment type"
+ ,"indicator - The string indicator to look up"
+ ,"nosql_table - The NoSQL Table to use"
+ ,"column_family - The Column Family to use"
+ }
+ ,returns = "A Map associated with the indicator and enrichment type. Empty otherwise."
+ )
+ public static class EnrichmentGet implements StellarFunction {
+ boolean initialized = false;
+ private static Cache<Table, EnrichmentLookup> enrichmentCollateralCache = CacheBuilder.newBuilder()
+ .build();
+ @Override
+ public Object apply(List<Object> args, Context context) throws ParseException {
+ if(!initialized) {
+ return false;
+ }
+ if(args.size() < 2) {
+ throw new IllegalStateException("Requires at least an enrichment type and indicator");
+ }
+ int i = 0;
+ String enrichmentType = (String) args.get(i++);
+ String indicator = (String) args.get(i++);
+ String table = (String) args.get(i++);
+ String cf = (String) args.get(i++);
+ if(enrichmentType == null || indicator == null) {
+ return new HashMap<String, Object>();
+ }
+ final Table key = new Table(table, cf);
+ EnrichmentLookup lookup = null;
+ try {
+ lookup = enrichmentCollateralCache.get(key, () -> {
+ HTableInterface hTable = provider.getTable(HBaseConfiguration.create(), key.name);
+ return new EnrichmentLookup(hTable, key.columnFamily, tracker);
+ }
+ );
+ } catch (ExecutionException e) {
+ LOG.error("Unable to retrieve enrichmentLookup: " + e.getMessage(), e);
+ return new HashMap<String, Object>();
+ }
+ EnrichmentLookup.HBaseContext hbaseContext = new EnrichmentLookup.HBaseContext(lookup.getTable(), cf);
+ try {
+ LookupKV<EnrichmentKey, EnrichmentValue> kv = lookup.get(new EnrichmentKey(enrichmentType, indicator), hbaseContext, true);
+ if (kv != null && kv.getValue() != null && kv.getValue().getMetadata() != null) {
+ return kv.getValue().getMetadata();
+ }
+ return new HashMap<String, Object>();
+ } catch (IOException e) {
+ LOG.error("Unable to call exists: " + e.getMessage(), e);
+ return new HashMap<String, Object>();
+ }
+ }
+
+ @Override
+ public void initialize(Context context) {
+ try {
+ Map<String, Object> config = getConfig(context);
+ initializeProvider(config);
+ initializeTracker(config, provider);
+ } catch (IOException e) {
+ LOG.error("Unable to initialize ENRICHMENT.GET: " + e.getMessage(), e);
+ }
+ finally{
+ initialized = true;
+ }
+ }
+
+ @Override
+ public boolean isInitialized() {
+ return initialized;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index 35fe7ce..48ae918 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.metron.TestConstants;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator;
+import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions;
import org.apache.metron.hbase.TableProvider;
import org.apache.metron.enrichment.converter.EnrichmentKey;
import org.apache.metron.enrichment.converter.EnrichmentValue;
@@ -58,7 +60,6 @@ import java.util.Properties;
import java.util.Set;
public class EnrichmentIntegrationTest extends BaseIntegrationTest {
-
private static final String SRC_IP = "ip_src_addr";
private static final String DST_IP = "ip_dst_addr";
private static final String MALICIOUS_IP_TYPE = "malicious_ip";
@@ -80,11 +81,6 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
}
}
-
-
-
-
-
@Test
public void test() throws Exception {
final EnrichmentConfigurations configurations = SampleUtil.getSampleEnrichmentConfigs();
@@ -112,11 +108,22 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
add(new KafkaWithZKComponent.Topic(Constants.INDEXING_TOPIC, 1));
}});
-
+ String globalConfigStr = null;
+ {
+ File globalConfig = new File(new File(TestConstants.SAMPLE_CONFIG_PATH), "global.json");
+ Map<String, Object> config = JSONUtils.INSTANCE.load(globalConfig, new TypeReference<Map<String, Object>>() {
+ });
+ config.put(SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF, Provider.class.getName());
+ config.put(SimpleHBaseEnrichmentFunctions.ACCESS_TRACKER_TYPE_CONF, "PERSISTENT_BLOOM");
+ config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_TABLE, trackerHBaseTableName);
+ config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_CF, cf);
+ globalConfigStr = JSONUtils.INSTANCE.toJSON(config, true);
+ }
ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
.withTopologyProperties(topologyProperties)
- .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
- .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH);
+ .withGlobalConfig(globalConfigStr)
+ .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
+ ;
//create MockHBaseTables
final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTableName, cf);
@@ -131,6 +138,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
)
);
}});
+
FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
.withTopologyLocation(new File(fluxPath))
.withTopologyName("test")
@@ -277,14 +285,28 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
Assert.assertEquals(indexedDoc.get("enrichments.hbaseEnrichment." + SRC_IP + "." + PLAYFUL_CLASSIFICATION_TYPE+ ".orientation")
, PLAYFUL_ENRICHMENT.get("orientation")
);
+ Assert.assertEquals(indexedDoc.get("src_classification.orientation")
+ , PLAYFUL_ENRICHMENT.get("orientation"));
+ Assert.assertEquals(indexedDoc.get("is_src_malicious")
+ , true);
}
else if(indexedDoc.getOrDefault(DST_IP,"").equals("10.0.2.3")) {
Assert.assertEquals( indexedDoc.get("enrichments.hbaseEnrichment." + DST_IP + "." + PLAYFUL_CLASSIFICATION_TYPE + ".orientation")
, PLAYFUL_ENRICHMENT.get("orientation")
);
+ Assert.assertEquals(indexedDoc.get("dst_classification.orientation")
+ , PLAYFUL_ENRICHMENT.get("orientation"));
+
+ }
+ if(!indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3")) {
+ Assert.assertEquals(indexedDoc.get("is_src_malicious")
+ , false);
}
}
-
+ else {
+ Assert.assertEquals(indexedDoc.get("is_src_malicious")
+ , false);
+ }
}
private static void threatIntelValidation(Map<String, Object> indexedDoc) {
if(indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3")
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
index 97effac..11b30e7 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
@@ -78,7 +78,11 @@ public class ConfigUploadComponent implements InMemoryComponent {
try {
final String zookeeperUrl = topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY);
- if(globalConfigPath != null) {
+ if(globalConfigPath != null
+ || parserConfigsPath != null
+ || enrichmentConfigsPath != null
+ || profilerConfigPath != null
+ ) {
uploadConfigsToZookeeper(globalConfigPath, parserConfigsPath, enrichmentConfigsPath, profilerConfigPath, zookeeperUrl);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java
new file mode 100644
index 0000000..1c0838f
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.stellar;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.enrichment.converter.EnrichmentHelper;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.EnrichmentLookup;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.test.mock.MockHTable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+
+public class SimpleHBaseEnrichmentFunctionsTest {
+ private final String hbaseTableName = "enrichments";
+ private static final String ENRICHMENT_TYPE = "et";
+ private String cf = "cf";
+ private static Context context;
+
+ public static class TP implements TableProvider {
+
+ @Override
+ public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+ return MockHTable.Provider.getFromCache(tableName);
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+
+ final MockHTable hbaseTable = (MockHTable) MockHTable.Provider.addToCache(hbaseTableName, cf);
+ EnrichmentHelper.INSTANCE.load(hbaseTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{
+ for(int i = 0;i < 5;++i) {
+ add(new LookupKV<>(new EnrichmentKey(ENRICHMENT_TYPE, "indicator" + i)
+ , new EnrichmentValue(ImmutableMap.of("key" + i, "value" + i))
+ )
+ );
+ }
+ }});
+ context = new Context.Builder()
+ .with( Context.Capabilities.GLOBAL_CONFIG
+ , () -> ImmutableMap.of( SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF
+ , TP.class.getName()
+ )
+ )
+ .build();
+ }
+ public Object run(String rule, Map<String, Object> variables) throws Exception {
+ StellarProcessor processor = new StellarProcessor();
+ Assert.assertTrue(rule + " not valid.", processor.validate(rule, context));
+ return processor.parse(rule, x -> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), context);
+ }
+
+ @Test
+ public void testExists() throws Exception {
+ String stellar = "ENRICHMENT_EXISTS('et', indicator, 'enrichments', 'cf')";
+ Object result = run(stellar, ImmutableMap.of("indicator", "indicator0"));
+ Assert.assertTrue(result instanceof Boolean);
+ Assert.assertTrue((Boolean)result);
+ }
+
+ @Test
+ public void testNotExists() throws Exception {
+ String stellar = "ENRICHMENT_EXISTS('et', indicator, 'enrichments', 'cf')";
+ Object result = run(stellar, ImmutableMap.of("indicator", "indicator7"));
+ Assert.assertTrue(result instanceof Boolean);
+ Assert.assertFalse((Boolean)result);
+ }
+ @Test
+ public void testSuccessfulGet() throws Exception {
+ String stellar = "ENRICHMENT_GET('et', indicator, 'enrichments', 'cf')";
+ Object result = run(stellar, ImmutableMap.of("indicator", "indicator0"));
+ Assert.assertTrue(result instanceof Map);
+ Map<String, Object> out = (Map<String, Object>) result;
+ Assert.assertEquals("value0", out.get("key0"));
+ }
+
+ @Test
+ public void testUnsuccessfulGet() throws Exception {
+ String stellar = "ENRICHMENT_GET('et', indicator, 'enrichments', 'cf')";
+ Object result = run(stellar, ImmutableMap.of("indicator", "indicator7"));
+ Assert.assertTrue(result instanceof Map);
+ Map<String, Object> out = (Map<String, Object>) result;
+ Assert.assertTrue(out.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
index 5c7adf6..c053cdc 100644
--- a/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
+++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
@@ -21,6 +21,12 @@
"foo": "1 + 1"
}
,"ALL_CAPS" : "TO_UPPER(source.type)"
+ ,"src_enrichment" : {
+ "src_classification" : "ENRICHMENT_GET('playful_classification', ip_src_addr, 'enrichments', 'cf')"
+ }
+ ,"dst_enrichment" : {
+ "dst_classification" : "ENRICHMENT_GET('playful_classification', ip_dst_addr, 'enrichments', 'cf')"
+ }
}
}
}
@@ -42,6 +48,7 @@
"stellar" : {
"config" : {
"bar" : "TO_UPPER(source.type)"
+ ,"is_src_malicious" : "ENRICHMENT_EXISTS('malicious_ip', ip_src_addr, 'threat_intel', 'cf')"
}
}
},
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index f2156a7..34dff11 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -100,6 +100,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
protected void initializeStellar() {
this.stellarContext = new Context.Builder()
.with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
.build();
StellarFunctions.initialize(stellarContext);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
index de08cf7..132be45 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
@@ -94,6 +94,8 @@ public class BaseEnrichmentBoltTest extends BaseBoltTest {
joinStreamIds.add("geo:");
joinStreamIds.add("stellar:");
joinStreamIds.add("stellar:numeric");
+ joinStreamIds.add("stellar:dst_enrichment");
+ joinStreamIds.add("stellar:src_enrichment");
joinStreamIds.add("host:");
joinStreamIds.add("hbaseEnrichment:");
joinStreamIds.add("message:");
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
index 723aa71..69a1f67 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
@@ -46,6 +46,7 @@ import java.util.*;
*/
public class MockHTable implements HTableInterface {
+
public static class Provider implements Serializable {
private static Map<String, HTableInterface> _cache = new HashMap<>();
public HTableInterface getTable(Configuration config, String tableName) throws IOException {