You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/09/13 22:39:01 UTC

incubator-metron git commit: METRON-397: Add a stellar function to interact with the HBase enrichment table closes apache/incubator-metron#234

Repository: incubator-metron
Updated Branches:
  refs/heads/master ef3e9fa5e -> 6fb85e124


METRON-397: Add a stellar function to interact with the HBase enrichment table closes apache/incubator-metron#234


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/6fb85e12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/6fb85e12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/6fb85e12

Branch: refs/heads/master
Commit: 6fb85e124d87581c37b79ea105570f4f85b59384
Parents: ef3e9fa
Author: cstella <ce...@gmail.com>
Authored: Tue Sep 13 18:38:51 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue Sep 13 18:38:51 2016 -0400

----------------------------------------------------------------------
 .../org/apache/metron/common/dsl/Context.java   |  21 +-
 .../common/dsl/functions/DateFunctions.java     |   1 +
 .../adapters/stellar/StellarAdapter.java        |  10 +-
 .../enrichment/bolt/GenericEnrichmentBolt.java  |   1 +
 .../accesstracker/AccessTrackerCreator.java     |  27 ++
 .../lookup/accesstracker/AccessTrackers.java    |  39 +++
 .../PersistentBloomTrackerCreator.java          | 101 +++++++
 .../stellar/SimpleHBaseEnrichmentFunctions.java | 275 +++++++++++++++++++
 .../integration/EnrichmentIntegrationTest.java  |  42 ++-
 .../components/ConfigUploadComponent.java       |   6 +-
 .../SimpleHBaseEnrichmentFunctionsTest.java     | 114 ++++++++
 .../main/config/zookeeper/enrichments/test.json |   7 +
 .../apache/metron/parsers/bolt/ParserBolt.java  |   1 +
 .../test/bolt/BaseEnrichmentBoltTest.java       |   2 +
 .../org/apache/metron/test/mock/MockHTable.java |   1 +
 15 files changed, 630 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
index aa8c9a2..bd43713 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
@@ -27,12 +27,13 @@ public class Context implements Serializable {
   public interface Capability {
     Object get();
   }
-
+  
   public enum Capabilities {
-    HBASE_PROVIDER,
-    ZOOKEEPER_CLIENT,
-    SERVICE_DISCOVERER,
-    GLOBAL_CONFIG
+    SENSOR_CONFIG
+    , HBASE_PROVIDER
+    , GLOBAL_CONFIG
+    , ZOOKEEPER_CLIENT
+    , SERVICE_DISCOVERER;
   }
 
   public static class Builder {
@@ -48,6 +49,14 @@ public class Context implements Serializable {
       capabilityMap.put(s.toString(), capability);
       return this;
     }
+    
+    public Builder withAll(Map<String, Object> externalConfig) {
+      for(Map.Entry<String, Object> entry : externalConfig.entrySet()) {
+
+        capabilityMap.put(entry.getKey(), () -> entry.getValue());
+      }
+      return this;
+    }
 
     public Context build() {
       return new Context(capabilityMap);
@@ -89,4 +98,4 @@ public class Context implements Serializable {
   public void addCapability(Enum<?> s, Capability capability) {
     this.capabilities.put(s.toString(), capability);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DateFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DateFunctions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DateFunctions.java
index e28afd3..d83e468 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DateFunctions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DateFunctions.java
@@ -109,6 +109,7 @@ public class DateFunctions {
     return sdf.parse(date).getTime();
   }
 
+
   /**
    * Stellar Function: TO_EPOCH_TIMESTAMP
    */

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
index 3ba7755..9fa7363 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
@@ -90,7 +90,15 @@ public class StellarAdapter implements EnrichmentAdapter<CacheKey>,Serializable
         if(kv.getValue() instanceof String) {
           String stellarStatement = (String) kv.getValue();
           Object o = processor.parse(stellarStatement, resolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext);
-          message.put(kv.getKey(), o);
+          if(o != null && o instanceof Map) {
+            for(Map.Entry<Object, Object> valueKv : ((Map<Object, Object>)o).entrySet()) {
+              String newKey = ((kv.getKey().length() > 0)?kv.getKey() + "." : "" )+ valueKv.getKey();
+              message.put(newKey, valueKv.getValue());
+            }
+          }
+          else {
+            message.put(kv.getKey(), o);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 6fad0f8..059817a 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -158,6 +158,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
   protected void initializeStellar() {
     stellarContext = new Context.Builder()
                          .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+                         .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
                          .build();
     StellarFunctions.initialize(stellarContext);
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerCreator.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerCreator.java
new file mode 100644
index 0000000..f4d0c4c
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackerCreator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup.accesstracker;
+
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface AccessTrackerCreator {
+  public AccessTracker create(Map<String, Object> config, TableProvider provider) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackers.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackers.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackers.java
new file mode 100644
index 0000000..7dad6eb
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/AccessTrackers.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup.accesstracker;
+
+
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.IOException;
+import java.util.Map;
+
+public enum AccessTrackers implements AccessTrackerCreator {
+   NOOP((config, provider) -> new NoopAccessTracker())
+  ,PERSISTENT_BLOOM( new PersistentBloomTrackerCreator());
+  AccessTrackerCreator creator;
+  AccessTrackers(AccessTrackerCreator creator) {
+    this.creator = creator;
+  }
+
+
+  @Override
+  public AccessTracker create(Map<String, Object> config, TableProvider provider) throws IOException {
+    return creator.create(config, provider);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentBloomTrackerCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentBloomTrackerCreator.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentBloomTrackerCreator.java
new file mode 100644
index 0000000..02d0c7f
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/PersistentBloomTrackerCreator.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.lookup.accesstracker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+public class PersistentBloomTrackerCreator implements AccessTrackerCreator {
+
+  public static class Config {
+    public static final String PERSISTENT_BLOOM_TABLE = "pbat.table";
+    public static final String PERSISTENT_BLOOM_CF = "pbat.cf";
+    public static final String PERSISTENT_BLOOM_FP = "pbat.false_positive";
+    public static final String PERSISTENT_BLOOM_EI = "pbat.expected_insertions";
+    public static final String PERSISTENT_BLOOM_MS_BETWEEN_PERSISTS = "pbat.ms_between_persists";
+    public static final long MS_IN_HOUR = 10000*60*60;
+    private String hBaseTable;
+    private String hBaseCF;
+    private double falsePositiveRate = 0.03;
+    private int expectedInsertions = 100000;
+    private long millisecondsBetweenPersists = 2*MS_IN_HOUR;
+    public Config(Map<String, Object> config) {
+      hBaseTable = (String) config.get(PERSISTENT_BLOOM_TABLE);
+      hBaseCF = (String) config.get(PERSISTENT_BLOOM_CF);
+      Object fpObj = config.get(PERSISTENT_BLOOM_FP);
+      if (fpObj != null) {
+        falsePositiveRate = ConversionUtils.convert(fpObj, Double.class);
+      }
+      Object eiObj = config.get(PERSISTENT_BLOOM_EI);
+      if (eiObj != null) {
+        expectedInsertions = ConversionUtils.convert(eiObj, Integer.class);
+      }
+      Object msObj = config.get(PERSISTENT_BLOOM_MS_BETWEEN_PERSISTS);
+      if(msObj != null) {
+        millisecondsBetweenPersists = ConversionUtils.convert(msObj, Long.class);
+      }
+    }
+
+    public String getHBaseTable() {
+      return hBaseTable;
+    }
+
+    public String getHBaseCF() {
+      return hBaseCF;
+    }
+
+    public double getFalsePositiveRate() {
+      return falsePositiveRate;
+    }
+
+    public int getExpectedInsertions() {
+      return expectedInsertions;
+    }
+
+
+    public long getMillisecondsBetweenPersists() {
+      return millisecondsBetweenPersists;
+    }
+  }
+
+  @Override
+  public AccessTracker create(Map<String, Object> config, TableProvider provider) throws IOException {
+    Config patConfig = new Config(config);
+    String hbaseTable = patConfig.getHBaseTable();
+    int expectedInsertions = patConfig.getExpectedInsertions();
+    double falsePositives = patConfig.getFalsePositiveRate();
+    long millisecondsBetweenPersist = patConfig.getMillisecondsBetweenPersists();
+    BloomAccessTracker bat = new BloomAccessTracker(hbaseTable, expectedInsertions, falsePositives);
+    Configuration hbaseConfig = HBaseConfiguration.create();
+
+    AccessTracker ret = new PersistentAccessTracker( hbaseTable
+                                                   , UUID.randomUUID().toString()
+                                                   , provider.getTable(hbaseConfig, hbaseTable)
+                                                   , patConfig.getHBaseCF()
+                                                   , bat
+                                                   , millisecondsBetweenPersist
+                                                   );
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctions.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctions.java
new file mode 100644
index 0000000..6d21d03
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctions.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.stellar;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.common.dsl.Stellar;
+import org.apache.metron.common.dsl.StellarFunction;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.EnrichmentLookup;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.enrichment.lookup.accesstracker.AccessTracker;
+import org.apache.metron.enrichment.lookup.accesstracker.AccessTrackers;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+public class SimpleHBaseEnrichmentFunctions {
+  private static final Logger LOG = Logger.getLogger(SimpleHBaseEnrichmentFunctions.class);
+  public static final String ACCESS_TRACKER_TYPE_CONF = "accessTracker";
+  public static final String TABLE_PROVIDER_TYPE_CONF = "tableProviderImpl";
+  private static AccessTracker tracker;
+  private static TableProvider provider;
+
+
+  private static class Table {
+    String name;
+    String columnFamily;
+
+    public Table(String name, String columnFamily) {
+      this.name = name;
+      this.columnFamily = columnFamily;
+    }
+
+    @Override
+    public String toString() {
+      return "Table{" +
+              "name='" + name + '\'' +
+              ", columnFamily='" + columnFamily + '\'' +
+              '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Table table = (Table) o;
+
+      if (name != null ? !name.equals(table.name) : table.name != null) return false;
+      return columnFamily != null ? columnFamily.equals(table.columnFamily) : table.columnFamily == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = name != null ? name.hashCode() : 0;
+      result = 31 * result + (columnFamily != null ? columnFamily.hashCode() : 0);
+      return result;
+    }
+  }
+
+
+  private static Map<String, Object> getConfig(Context context) {
+    return (Map<String, Object>) context.getCapability(Context.Capabilities.GLOBAL_CONFIG).orElse(new HashMap<>());
+  }
+
+  private static synchronized void initializeTracker(Map<String, Object> config, TableProvider provider) throws IOException {
+    if(tracker == null) {
+      String accessTrackerType = (String) config.getOrDefault(ACCESS_TRACKER_TYPE_CONF, AccessTrackers.NOOP.toString());
+      AccessTrackers trackers = AccessTrackers.valueOf(accessTrackerType);
+      tracker = trackers.create(config, provider);
+    }
+  }
+
+  private static TableProvider createProvider(String tableProviderClass) {
+    try {
+      Class<? extends TableProvider> providerClazz = (Class<? extends TableProvider>) Class.forName(tableProviderClass);
+      return providerClazz.newInstance();
+    } catch (Exception e) {
+      return new HTableProvider();
+    }
+  }
+
+  private static synchronized void initializeProvider( Map<String, Object> config) {
+    if(provider != null) {
+      return ;
+    }
+    else {
+      String tableProviderClass = (String) config.getOrDefault(TABLE_PROVIDER_TYPE_CONF, HTableProvider.class.getName());
+      provider = createProvider(tableProviderClass);
+    }
+  }
+
+  @Stellar(name="EXISTS"
+          ,namespace="ENRICHMENT"
+          ,description="Interrogates the HBase table holding the simple hbase enrichment data and returns whether the" +
+          " enrichment type and indicator are in the table."
+          ,params = {
+                      "enrichment_type - The enrichment type"
+                     ,"indicator - The string indicator to look up"
+                     ,"nosql_table - The NoSQL Table to use"
+                     ,"column_family - The Column Family to use"
+                    }
+          ,returns = "True if the enrichment indicator exists and false otherwise"
+          )
+  public static class EnrichmentExists implements StellarFunction {
+    boolean initialized = false;
+    private static Cache<Table, EnrichmentLookup> enrichmentCollateralCache = CacheBuilder.newBuilder()
+                                                                                        .build();
+    @Override
+    public Object apply(List<Object> args, Context context) throws ParseException {
+      if(!initialized) {
+        return false;
+      }
+      if(args.size() < 2) {
+        throw new IllegalStateException("Requires at least an enrichment type and indicator");
+      }
+      int i = 0;
+      String enrichmentType = (String) args.get(i++);
+      String indicator = (String) args.get(i++);
+      String table = (String) args.get(i++);
+      String cf = (String) args.get(i++);
+      if(enrichmentType == null || indicator == null) {
+        return false;
+      }
+      final Table key = new Table(table, cf);
+      EnrichmentLookup lookup = null;
+      try {
+        lookup = enrichmentCollateralCache.get(key, () -> {
+            HTableInterface hTable = provider.getTable(HBaseConfiguration.create(), key.name);
+            return new EnrichmentLookup(hTable, key.columnFamily, tracker);
+          }
+        );
+      } catch (ExecutionException e) {
+        LOG.error("Unable to retrieve enrichmentLookup: " + e.getMessage(), e);
+        return false;
+      }
+      EnrichmentLookup.HBaseContext hbaseContext = new EnrichmentLookup.HBaseContext(lookup.getTable(), cf);
+      try {
+        return lookup.exists(new EnrichmentKey(enrichmentType, indicator), hbaseContext, true);
+      } catch (IOException e) {
+        LOG.error("Unable to call exists: " + e.getMessage(), e);
+        return false;
+      }
+    }
+
+    @Override
+    public void initialize(Context context) {
+      try {
+        Map<String, Object> config = getConfig(context);
+        initializeProvider(config);
+        initializeTracker(config, provider);
+      } catch (IOException e) {
+        LOG.error("Unable to initialize ENRICHMENT.EXISTS: " + e.getMessage(), e);
+      }
+      finally{
+        initialized = true;
+      }
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return initialized;
+    }
+
+  }
+
+
+
+  @Stellar(name="GET"
+          ,namespace="ENRICHMENT"
+          ,description="Interrogates the HBase table holding the simple hbase enrichment data and retrieves the " +
+          "tabular value associated with the enrichment type and indicator."
+          ,params = {
+                      "enrichment_type - The enrichment type"
+                     ,"indicator - The string indicator to look up"
+                     ,"nosql_table - The NoSQL Table to use"
+                     ,"column_family - The Column Family to use"
+                    }
+          ,returns = "A Map associated with the indicator and enrichment type.  Empty otherwise."
+          )
+  public static class EnrichmentGet implements StellarFunction {
+    boolean initialized = false;
+    private static Cache<Table, EnrichmentLookup> enrichmentCollateralCache = CacheBuilder.newBuilder()
+                                                                                        .build();
+    @Override
+    public Object apply(List<Object> args, Context context) throws ParseException {
+      if(!initialized) {
+        return false;
+      }
+      if(args.size() < 2) {
+        throw new IllegalStateException("Requires at least an enrichment type and indicator");
+      }
+      int i = 0;
+      String enrichmentType = (String) args.get(i++);
+      String indicator = (String) args.get(i++);
+      String table = (String) args.get(i++);
+      String cf = (String) args.get(i++);
+      if(enrichmentType == null || indicator == null) {
+        return new HashMap<String, Object>();
+      }
+      final Table key = new Table(table, cf);
+      EnrichmentLookup lookup = null;
+      try {
+        lookup = enrichmentCollateralCache.get(key, () -> {
+                  HTableInterface hTable = provider.getTable(HBaseConfiguration.create(), key.name);
+                  return new EnrichmentLookup(hTable, key.columnFamily, tracker);
+                }
+        );
+      } catch (ExecutionException e) {
+        LOG.error("Unable to retrieve enrichmentLookup: " + e.getMessage(), e);
+        return new HashMap<String, Object>();
+      }
+      EnrichmentLookup.HBaseContext hbaseContext = new EnrichmentLookup.HBaseContext(lookup.getTable(), cf);
+      try {
+        LookupKV<EnrichmentKey, EnrichmentValue> kv = lookup.get(new EnrichmentKey(enrichmentType, indicator), hbaseContext, true);
+        if (kv != null && kv.getValue() != null && kv.getValue().getMetadata() != null) {
+          return kv.getValue().getMetadata();
+        }
+        return new HashMap<String, Object>();
+      } catch (IOException e) {
+        LOG.error("Unable to call exists: " + e.getMessage(), e);
+        return new HashMap<String, Object>();
+      }
+    }
+
+    @Override
+    public void initialize(Context context) {
+      try {
+        Map<String, Object> config = getConfig(context);
+        initializeProvider(config);
+        initializeTracker(config, provider);
+      } catch (IOException e) {
+        LOG.error("Unable to initialize ENRICHMENT.GET: " + e.getMessage(), e);
+      }
+      finally{
+        initialized = true;
+      }
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return initialized;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index 35fe7ce..48ae918 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.metron.TestConstants;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator;
+import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions;
 import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
@@ -58,7 +60,6 @@ import java.util.Properties;
 import java.util.Set;
 
 public class EnrichmentIntegrationTest extends BaseIntegrationTest {
-
   private static final String SRC_IP = "ip_src_addr";
   private static final String DST_IP = "ip_dst_addr";
   private static final String MALICIOUS_IP_TYPE = "malicious_ip";
@@ -80,11 +81,6 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
     }
   }
 
-
-
-
-
-
   @Test
   public void test() throws Exception {
     final EnrichmentConfigurations configurations = SampleUtil.getSampleEnrichmentConfigs();
@@ -112,11 +108,22 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
       add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
       add(new KafkaWithZKComponent.Topic(Constants.INDEXING_TOPIC, 1));
     }});
-
+    String globalConfigStr = null;
+    {
+      File globalConfig = new File(new File(TestConstants.SAMPLE_CONFIG_PATH), "global.json");
+      Map<String, Object> config = JSONUtils.INSTANCE.load(globalConfig, new TypeReference<Map<String, Object>>() {
+      });
+      config.put(SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF, Provider.class.getName());
+      config.put(SimpleHBaseEnrichmentFunctions.ACCESS_TRACKER_TYPE_CONF, "PERSISTENT_BLOOM");
+      config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_TABLE, trackerHBaseTableName);
+      config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_CF, cf);
+      globalConfigStr = JSONUtils.INSTANCE.toJSON(config, true);
+    }
     ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
             .withTopologyProperties(topologyProperties)
-            .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
-            .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH);
+            .withGlobalConfig(globalConfigStr)
+            .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
+            ;
 
     //create MockHBaseTables
     final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTableName, cf);
@@ -131,6 +138,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
                         )
          );
     }});
+
     FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
             .withTopologyLocation(new File(fluxPath))
             .withTopologyName("test")
@@ -277,14 +285,28 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
         Assert.assertEquals(indexedDoc.get("enrichments.hbaseEnrichment." + SRC_IP + "." + PLAYFUL_CLASSIFICATION_TYPE+ ".orientation")
                 , PLAYFUL_ENRICHMENT.get("orientation")
         );
+        Assert.assertEquals(indexedDoc.get("src_classification.orientation")
+                , PLAYFUL_ENRICHMENT.get("orientation"));
+        Assert.assertEquals(indexedDoc.get("is_src_malicious")
+                , true);
       }
       else if(indexedDoc.getOrDefault(DST_IP,"").equals("10.0.2.3")) {
         Assert.assertEquals( indexedDoc.get("enrichments.hbaseEnrichment." + DST_IP + "." + PLAYFUL_CLASSIFICATION_TYPE + ".orientation")
                 , PLAYFUL_ENRICHMENT.get("orientation")
         );
+        Assert.assertEquals(indexedDoc.get("dst_classification.orientation")
+                , PLAYFUL_ENRICHMENT.get("orientation"));
+
+      }
+      if(!indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3")) {
+        Assert.assertEquals(indexedDoc.get("is_src_malicious")
+                , false);
       }
     }
-
+    else {
+      Assert.assertEquals(indexedDoc.get("is_src_malicious")
+              , false);
+    }
   }
   private static void threatIntelValidation(Map<String, Object> indexedDoc) {
     if(indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3")

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
index 97effac..11b30e7 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
@@ -78,7 +78,11 @@ public class ConfigUploadComponent implements InMemoryComponent {
     try {
       final String zookeeperUrl = topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY);
 
-      if(globalConfigPath != null) {
+      if(globalConfigPath != null
+      || parserConfigsPath != null
+      || enrichmentConfigsPath != null
+      || profilerConfigPath != null
+        ) {
         uploadConfigsToZookeeper(globalConfigPath, parserConfigsPath, enrichmentConfigsPath, profilerConfigPath, zookeeperUrl);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java
new file mode 100644
index 0000000..1c0838f
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.stellar;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.enrichment.converter.EnrichmentHelper;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.EnrichmentLookup;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.test.mock.MockHTable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+
+public class SimpleHBaseEnrichmentFunctionsTest {
+  private final String hbaseTableName = "enrichments";
+  private static final String ENRICHMENT_TYPE = "et";
+  private String cf = "cf";
+  private static Context context;
+
+  public static class TP implements TableProvider {
+
+    @Override
+    public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+      return MockHTable.Provider.getFromCache(tableName);
+    }
+  }
+
+  @Before
+  public void setup() throws Exception {
+
+    final MockHTable hbaseTable = (MockHTable) MockHTable.Provider.addToCache(hbaseTableName, cf);
+    EnrichmentHelper.INSTANCE.load(hbaseTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{
+      for(int i = 0;i < 5;++i) {
+        add(new LookupKV<>(new EnrichmentKey(ENRICHMENT_TYPE, "indicator" + i)
+                        , new EnrichmentValue(ImmutableMap.of("key" + i, "value" + i))
+                )
+        );
+      }
+    }});
+    context = new Context.Builder()
+            .with( Context.Capabilities.GLOBAL_CONFIG
+                 , () -> ImmutableMap.of( SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF
+                                        , TP.class.getName()
+                                        )
+                 )
+            .build();
+  }
+  public Object run(String rule, Map<String, Object> variables) throws Exception {
+    StellarProcessor processor = new StellarProcessor();
+    Assert.assertTrue(rule + " not valid.", processor.validate(rule, context));
+    return processor.parse(rule, x -> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), context);
+  }
+
+  @Test
+  public void testExists() throws Exception {
+    String stellar = "ENRICHMENT_EXISTS('et', indicator, 'enrichments', 'cf')";
+    Object result = run(stellar, ImmutableMap.of("indicator", "indicator0"));
+    Assert.assertTrue(result instanceof Boolean);
+    Assert.assertTrue((Boolean)result);
+  }
+
+  @Test
+  public void testNotExists() throws Exception {
+    String stellar = "ENRICHMENT_EXISTS('et', indicator, 'enrichments', 'cf')";
+    Object result = run(stellar, ImmutableMap.of("indicator", "indicator7"));
+    Assert.assertTrue(result instanceof Boolean);
+    Assert.assertFalse((Boolean)result);
+  }
+  @Test
+  public void testSuccessfulGet() throws Exception {
+    String stellar = "ENRICHMENT_GET('et', indicator, 'enrichments', 'cf')";
+    Object result = run(stellar, ImmutableMap.of("indicator", "indicator0"));
+    Assert.assertTrue(result instanceof Map);
+    Map<String, Object> out = (Map<String, Object>) result;
+    Assert.assertEquals("value0", out.get("key0"));
+  }
+
+  @Test
+  public void testUnsuccessfulGet() throws Exception {
+    String stellar = "ENRICHMENT_GET('et', indicator, 'enrichments', 'cf')";
+    Object result = run(stellar, ImmutableMap.of("indicator", "indicator7"));
+    Assert.assertTrue(result instanceof Map);
+    Map<String, Object> out = (Map<String, Object>) result;
+    Assert.assertTrue(out.isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
index 5c7adf6..c053cdc 100644
--- a/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
+++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
@@ -21,6 +21,12 @@
                       "foo": "1 + 1"
                       }
           ,"ALL_CAPS" : "TO_UPPER(source.type)"
+          ,"src_enrichment" : {
+            "src_classification" : "ENRICHMENT_GET('playful_classification', ip_src_addr, 'enrichments', 'cf')"
+          }
+          ,"dst_enrichment" : {
+            "dst_classification" : "ENRICHMENT_GET('playful_classification', ip_dst_addr, 'enrichments', 'cf')"
+          }
         }
       }
     }
@@ -42,6 +48,7 @@
       "stellar" : {
         "config" : {
           "bar" : "TO_UPPER(source.type)"
+         ,"is_src_malicious" : "ENRICHMENT_EXISTS('malicious_ip', ip_src_addr, 'threat_intel', 'cf')"
         }
       }
     },

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index f2156a7..34dff11 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -100,6 +100,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
   protected void initializeStellar() {
     this.stellarContext = new Context.Builder()
                                 .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+                                .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
                                 .build();
     StellarFunctions.initialize(stellarContext);
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
index de08cf7..132be45 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
@@ -94,6 +94,8 @@ public class BaseEnrichmentBoltTest extends BaseBoltTest {
     joinStreamIds.add("geo:");
     joinStreamIds.add("stellar:");
     joinStreamIds.add("stellar:numeric");
+    joinStreamIds.add("stellar:dst_enrichment");
+    joinStreamIds.add("stellar:src_enrichment");
     joinStreamIds.add("host:");
     joinStreamIds.add("hbaseEnrichment:");
     joinStreamIds.add("message:");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6fb85e12/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
index 723aa71..69a1f67 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
@@ -46,6 +46,7 @@ import java.util.*;
  */
 public class MockHTable implements HTableInterface {
 
+
   public static class Provider implements Serializable {
     private static Map<String, HTableInterface> _cache = new HashMap<>();
     public HTableInterface getTable(Configuration config, String tableName) throws IOException {