You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/05/29 15:00:20 UTC

incubator-metron git commit: METRON-178 Expose the filter capability in the Parser topologies. closes apache/incubator-metron#129

Repository: incubator-metron
Updated Branches:
  refs/heads/master ed96afe08 -> ddc6c07c6


METRON-178 Expose the filter capability in the Parser topologies. closes apache/incubator-metron#129


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/ddc6c07c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/ddc6c07c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/ddc6c07c

Branch: refs/heads/master
Commit: ddc6c07c659d7fc49d1d832bbe46c44624ee9eee
Parents: ed96afe
Author: cstella <ce...@gmail.com>
Authored: Sun May 29 11:00:12 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Sun May 29 11:00:12 2016 -0400

----------------------------------------------------------------------
 .../common/bolt/ConfiguredParserBolt.java       | 13 +++-
 .../configuration/SensorParserConfig.java       | 13 ++++
 .../metron/common/utils/ReflectionUtils.java    | 11 ++-
 .../common/bolt/ConfiguredParserBoltTest.java   |  2 +-
 .../apache/metron/parsers/bolt/ParserBolt.java  | 27 ++++---
 .../parsers/filters/AbstractMessageFilter.java  | 30 ++++++++
 .../parsers/filters/BroMessageFilter.java       | 77 +++++++++++---------
 .../apache/metron/parsers/filters/Filters.java  | 58 +++++++++++++++
 .../parsers/filters/GenericMessageFilter.java   | 11 ++-
 .../metron/parsers/filters/QueryFilter.java     | 57 +++++++++++++++
 .../metron/parsers/interfaces/Configurable.java | 26 +++++++
 .../parsers/interfaces/MessageFilter.java       |  6 +-
 .../parsers/interfaces/MessageParser.java       |  4 +-
 .../org/apache/metron/filters/FiltersTest.java  | 57 +++++++++++++++
 .../metron/parsers/bolt/ParserBoltTest.java     | 47 ++++++++++++
 15 files changed, 385 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
index 543f87b..feab40e 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
@@ -22,6 +22,7 @@ import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
 
 import java.io.IOException;
 
@@ -30,15 +31,23 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt {
   private static final Logger LOG = Logger.getLogger(ConfiguredEnrichmentBolt.class);
 
   protected final ParserConfigurations configurations = new ParserConfigurations();
-
-  public ConfiguredParserBolt(String zookeeperUrl) {
+  private String sensorType;
+  public ConfiguredParserBolt(String zookeeperUrl, String sensorType) {
     super(zookeeperUrl);
+    this.sensorType = sensorType;
+  }
+
+  protected SensorParserConfig getSensorParserConfig() {
+    return getConfigurations().getSensorParserConfig(sensorType);
   }
 
   public ParserConfigurations getConfigurations() {
     return configurations;
   }
 
+  public String getSensorType() {
+    return sensorType;
+  }
   @Override
   public void loadConfig() {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index 106c55a..5d1bda9 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -29,6 +29,7 @@ import java.util.Map;
 public class SensorParserConfig {
 
   private String parserClassName;
+  private String filterClassName;
   private String sensorTopic;
   private Map<String, Object> parserConfig = new HashMap<>();
   private List<FieldTransformer> fieldTransformations = new ArrayList<>();
@@ -41,6 +42,14 @@ public class SensorParserConfig {
     this.fieldTransformations = fieldTransformations;
   }
 
+  public String getFilterClassName() {
+    return filterClassName;
+  }
+
+  public void setFilterClassName(String filterClassName) {
+    this.filterClassName = filterClassName;
+  }
+
   public String getParserClassName() {
     return parserClassName;
   }
@@ -86,6 +95,7 @@ public class SensorParserConfig {
   public String toString() {
     return "SensorParserConfig{" +
             "parserClassName='" + parserClassName + '\'' +
+            ", filterClassName='" + filterClassName + '\'' +
             ", sensorTopic='" + sensorTopic + '\'' +
             ", parserConfig=" + parserConfig +
             ", fieldTransformations=" + fieldTransformations +
@@ -101,6 +111,8 @@ public class SensorParserConfig {
 
     if (getParserClassName() != null ? !getParserClassName().equals(that.getParserClassName()) : that.getParserClassName() != null)
       return false;
+    if (getFilterClassName() != null ? !getFilterClassName().equals(that.getFilterClassName()) : that.getFilterClassName() != null)
+      return false;
     if (getSensorTopic() != null ? !getSensorTopic().equals(that.getSensorTopic()) : that.getSensorTopic() != null)
       return false;
     if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
@@ -112,6 +124,7 @@ public class SensorParserConfig {
   @Override
   public int hashCode() {
     int result = getParserClassName() != null ? getParserClassName().hashCode() : 0;
+    result = 31 * result + (getFilterClassName() != null ? getFilterClassName().hashCode() : 0);
     result = 31 * result + (getSensorTopic() != null ? getSensorTopic().hashCode() : 0);
     result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
     result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
index 141221d..dadcb1b 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
@@ -32,10 +32,9 @@ public class ReflectionUtils<T> {
     return instance;
   }
 
-  public static <T> T createInstance(String className) {
+  public static <T> T createInstance(Class<? extends T> clazz) {
     T instance;
     try {
-      Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
       instance = clazz.getConstructor().newInstance();
     } catch (InstantiationException e) {
       throw new IllegalStateException("Unable to instantiate connector.", e);
@@ -45,6 +44,14 @@ public class ReflectionUtils<T> {
       throw new IllegalStateException("Unable to instantiate connector", e);
     } catch (NoSuchMethodException e) {
       throw new IllegalStateException("Unable to instantiate connector: no such method", e);
+    }
+    return instance;
+  }
+  public static <T> T createInstance(String className) {
+    T instance;
+    try {
+      Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
+      instance = createInstance(clazz);
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException("Unable to instantiate connector: class not found", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
index f945dd0..3010ed8 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
@@ -43,7 +43,7 @@ public class ConfiguredParserBoltTest extends BaseConfiguredBoltTest {
   public static class StandAloneConfiguredParserBolt extends ConfiguredParserBolt {
 
     public StandAloneConfiguredParserBolt(String zookeeperUrl) {
-      super(zookeeperUrl);
+      super(zookeeperUrl, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 2a64666..addeaab 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -23,6 +23,8 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.parsers.filters.Filters;
 import org.apache.metron.common.configuration.FieldTransformer;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.parsers.filters.GenericMessageFilter;
@@ -39,14 +41,12 @@ public class ParserBolt extends ConfiguredParserBolt {
 
   private OutputCollector collector;
   private MessageParser<JSONObject> parser;
-  private MessageFilter<JSONObject> filter = new GenericMessageFilter();
+  private MessageFilter<JSONObject> filter;
   private MessageWriter<JSONObject> writer;
-  private String sensorType;
 
   public ParserBolt(String zookeeperUrl, String sensorType, MessageParser<JSONObject> parser, MessageWriter<JSONObject> writer) {
-    super(zookeeperUrl);
+    super(zookeeperUrl, sensorType);
     this.parser = parser;
-    this.sensorType = sensorType;
     this.writer = writer;
   }
 
@@ -60,35 +60,44 @@ public class ParserBolt extends ConfiguredParserBolt {
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     super.prepare(stormConf, context, collector);
     this.collector = collector;
+    if(getSensorParserConfig() == null) {
+      filter = new GenericMessageFilter();
+    }
+    else if(filter == null) {
+      filter = Filters.get(getSensorParserConfig().getFilterClassName()
+              , getSensorParserConfig().getParserConfig()
+      );
+    }
     parser.init();
     writer.init();
-    SensorParserConfig config = getConfigurations().getSensorParserConfig(sensorType);
+    SensorParserConfig config = getSensorParserConfig();
     if(config != null) {
       config.init();
     }
     else {
-      throw new IllegalStateException("Unable to retrieve a parser config for " + sensorType);
+      throw new IllegalStateException("Unable to retrieve a parser config for " + getSensorType());
     }
   }
 
+
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
     byte[] originalMessage = tuple.getBinary(0);
-    SensorParserConfig sensorParserConfig = getConfigurations().getSensorParserConfig(sensorType);
+    SensorParserConfig sensorParserConfig = getSensorParserConfig();
     try {
       if(sensorParserConfig != null) {
         List<JSONObject> messages = parser.parse(originalMessage);
         for (JSONObject message : messages) {
           if (parser.validate(message)) {
             if (filter != null && filter.emitTuple(message)) {
-              message.put(Constants.SENSOR_TYPE, sensorType);
+              message.put(Constants.SENSOR_TYPE, getSensorType());
               for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
                 if (handler != null) {
                   handler.transformAndUpdate(message, sensorParserConfig.getParserConfig());
                 }
               }
-              writer.write(sensorType, configurations, tuple, message);
+              writer.write(getSensorType(), configurations, tuple, message);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/AbstractMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/AbstractMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/AbstractMessageFilter.java
new file mode 100644
index 0000000..8fa2af9
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/AbstractMessageFilter.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.filters;
+
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public abstract class AbstractMessageFilter implements MessageFilter<JSONObject>, Serializable{
+  public AbstractMessageFilter(Map<String, Object> config) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
index d026d08..2145dfe 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
@@ -24,39 +24,50 @@ import org.json.simple.JSONObject;
 import java.io.Serializable;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
-public class BroMessageFilter implements MessageFilter<JSONObject>,
-				Serializable {
-
-	/**
-	 * Filter protocols based on whitelists and blacklists
-	 */
-	
-	private static final long serialVersionUID = -3824683649114625033L;
-	private String _key;
-	private final Set<String> _known_protocols;
-
-	 /**
-	 * @param  conf  Commons configuration for reading properties files
-	 * @param  key Key in a JSON mesage where the protocol field is located
-	 */
-	
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public BroMessageFilter(Configuration conf, String key) {
-		_key = key;
-		_known_protocols = new HashSet<>();
-		List known_protocols = conf.getList("source.known.protocols");
-		_known_protocols.addAll(known_protocols);
-	}
-
-	 /**
-	 * @param  message  JSON representation of a message with a protocol field
-	 * @return      False if message if filtered and True if message is not filtered
-	 */
-	
-	public boolean emitTuple(JSONObject message) {
-		String protocol = (String) message.get(_key);
-		return _known_protocols.contains(protocol);
-	}
+public class BroMessageFilter implements MessageFilter<JSONObject>{
+
+  /**
+   * Filter protocols based on whitelists and blacklists
+   */
+
+  private static final long serialVersionUID = -3824683649114625033L;
+  private String _key;
+  private final Set<String> _known_protocols;
+
+  public BroMessageFilter() {
+    _known_protocols = new HashSet<>();
+  }
+
+  public void configure(Map<String, Object> config) {
+    Object protocolsObj = config.get("bro.filter.source.known.protocols");
+    Object keyObj = config.get("bro.filter.source.key");
+    if(keyObj != null) {
+      _key = keyObj.toString();
+    }
+    if(protocolsObj != null) {
+      if(protocolsObj instanceof String) {
+        _known_protocols.clear();
+        _known_protocols.add(protocolsObj.toString());
+      }
+      else if(protocolsObj instanceof List) {
+        _known_protocols.clear();
+        for(Object o : (List)protocolsObj) {
+          _known_protocols.add(o.toString());
+        }
+      }
+    }
+  }
+
+  /**
+   * @param  message  JSON representation of a message with a protocol field
+   * @return      False if message if filtered and True if message is not filtered
+   */
+
+  public boolean emitTuple(JSONObject message) {
+    String protocol = (String) message.get(_key);
+    return _known_protocols.contains(protocol);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java
new file mode 100644
index 0000000..fc9a49e
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.filters;
+
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.json.simple.JSONObject;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+public enum Filters {
+   BRO(BroMessageFilter.class)
+  ,QUERY(QueryFilter.class)
+  ,DEFAULT(GenericMessageFilter.class)
+  ;
+  Class<? extends MessageFilter> clazz;
+  Filters(Class<? extends MessageFilter> clazz) {
+    this.clazz = clazz;
+  }
+  public static MessageFilter<JSONObject> get(String filterName, Map<String, Object> config) {
+    if(filterName == null || filterName.trim().isEmpty()) {
+      return new GenericMessageFilter();
+    }
+    Class<? extends MessageFilter> filterClass;
+    try {
+      Filters f = Filters.valueOf(filterName);
+      filterClass = f.clazz;
+    }
+    catch(Exception ex) {
+      try {
+        filterClass = (Class<? extends MessageFilter>) Class.forName(filterName);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalStateException("Unable to find class " + filterName, e);
+      }
+    }
+    MessageFilter<JSONObject> filter = ReflectionUtils.createInstance(filterClass);
+    filter.configure(config);
+    return filter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java
index 9defe32..e4524ae 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java
@@ -21,14 +21,21 @@ import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.json.simple.JSONObject;
 
 import java.io.Serializable;
+import java.util.Map;
 
-public class GenericMessageFilter implements MessageFilter<JSONObject>,
-				Serializable {
+public class GenericMessageFilter implements MessageFilter<JSONObject>{
 
 	private static final long serialVersionUID = 3626397212398318852L;
 
+	public GenericMessageFilter() {
+	}
+
 	public boolean emitTuple(JSONObject message) {
 		return true;
 	}
 
+	@Override
+	public void configure(Map<String, Object> config) {
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/QueryFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/QueryFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/QueryFilter.java
new file mode 100644
index 0000000..ee64fb4
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/QueryFilter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.filters;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.metron.common.query.MapVariableResolver;
+import org.apache.metron.common.query.PredicateProcessor;
+import org.apache.metron.common.query.VariableResolver;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.json.simple.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class QueryFilter implements MessageFilter<JSONObject> {
+  public static final String QUERY_STRING_CONF = "filter.query";
+  private PredicateProcessor processor = new PredicateProcessor();
+  private String query;
+
+  public QueryFilter()
+  {
+
+  }
+
+  @Override
+  public void configure(Map<String, Object> config) {
+    Object o = config.get(QUERY_STRING_CONF);
+    if(o instanceof String) {
+      query= o.toString();
+    }
+
+    processor.validate(query, true);
+  }
+
+  @Override
+  public boolean emitTuple(JSONObject message) {
+    VariableResolver resolver = new MapVariableResolver(message);
+    return processor.parse(query, resolver);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/Configurable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/Configurable.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/Configurable.java
new file mode 100644
index 0000000..4a707e6
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/Configurable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.interfaces;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public interface Configurable extends Serializable {
+  void configure(Map<String, Object> config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
index 2e5ab29..10a899b 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
@@ -17,8 +17,10 @@
  */
 package org.apache.metron.parsers.interfaces;
 
-public interface MessageFilter<T> {
+import java.io.Serializable;
+import java.util.Map;
 
-	boolean emitTuple(T message);
+public interface MessageFilter<T> extends Configurable{
 
+	boolean emitTuple(T message);
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
index 81d1b1a..439f06d 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
@@ -21,9 +21,7 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
-public interface MessageParser<T> extends Serializable {
-
-	void configure(Map<String, Object> parserConfig);
+public interface MessageParser<T> extends Configurable{
 	void init();
 	List<T> parse(byte[] rawMessage);
 	boolean validate(T message);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
new file mode 100644
index 0000000..9df0101
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.filters;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.logging.log4j.core.filter.AbstractFilter;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.parsers.filters.AbstractMessageFilter;
+import org.apache.metron.parsers.filters.Filters;
+import org.apache.metron.parsers.filters.GenericMessageFilter;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FiltersTest {
+  @Test
+  public void testDefault() {
+    Assert.assertTrue(Filters.get("DEFAULT", null).emitTuple(null));
+    Assert.assertTrue(Filters.get(GenericMessageFilter.class.getName(), null).emitTuple(null));
+  }
+
+  @Test
+  public void testSingleQueryFilter() throws Exception {
+    {
+      Map<String, Object> config = new HashMap<String, Object>() {{
+        put("filter.query", "exists(foo)");
+      }};
+      MessageFilter<JSONObject> filter = Filters.get("QUERY", config);
+      Assert.assertTrue(filter.emitTuple(new JSONObject(ImmutableMap.of("foo", 1))));
+      Assert.assertFalse(filter.emitTuple(new JSONObject(ImmutableMap.of("bar", 1))));
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index b90e521..658d52e 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.metron.parsers.bolt;
 
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.test.bolt.BaseBoltTest;
@@ -29,6 +31,7 @@ import org.json.simple.parser.JSONParser;
 import org.junit.Test;
 import org.mockito.Mock;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -100,4 +103,48 @@ public class ParserBoltTest extends BaseBoltTest {
     verify(outputCollector, times(1)).reportError(any(Throwable.class));
   }
 
+  /**
+   {
+    "filterClassName" : "QUERY"
+   ,"parserConfig" : {
+    "filter.query" : "exists(field1)"
+    }
+   }
+   */
+  @Multiline
+  public static String sensorParserConfig;
+  @Test
+  public void testFilter() throws Exception {
+    String sensorType = "yaf";
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, writer) {
+      @Override
+      protected SensorParserConfig getSensorParserConfig() {
+        try {
+          return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig));
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+    parserBolt.setCuratorFramework(client);
+    parserBolt.setTreeCache(cache);
+    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(parser, times(1)).init();
+    verify(writer, times(1)).init();
+    byte[] sampleBinary = "some binary message".getBytes();
+    JSONParser jsonParser = new JSONParser();
+    final JSONObject sampleMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\" }");
+    final JSONObject sampleMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\" }");
+    List<JSONObject> messages = new ArrayList<JSONObject>() {{
+      add(sampleMessage1);
+      add(sampleMessage2);
+    }};
+    final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\" }");
+    when(tuple.getBinary(0)).thenReturn(sampleBinary);
+    when(parser.parse(sampleBinary)).thenReturn(messages);
+    when(parser.validate(any(JSONObject.class))).thenReturn(true);
+    parserBolt.execute(tuple);
+    verify(writer, times(1)).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage1));
+    verify(outputCollector, times(1)).ack(tuple);
+  }
 }